不要怂,就是干,撸起袖子干!

Commit fade4570 by Jan Aagaard Meier

initial read replication

1 parent 5c8c1bbe
...@@ -13,27 +13,110 @@ module.exports = (function() { ...@@ -13,27 +13,110 @@ module.exports = (function() {
this.queue = [] this.queue = []
this.activeQueue = [] this.activeQueue = []
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50) this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
this.poolCfg = this.config.pool this.poolCfg = Utils._.defaults(this.config.pool, {
maxConnections: 10,
minConnections: 0,
maxIdleTime: 1000
});
this.pendingQueries = 0; this.pendingQueries = 0;
this.useReplicaton = !!config.replication;
this.useQueue = config.queue !== undefined ? config.queue : true; this.useQueue = config.queue !== undefined ? config.queue : true;
var self = this var self = this
if (this.poolCfg) { if (this.useReplicaton) {
//the user has requested pooling, so create our connection pool var reads = 0,
if (!this.poolCfg.maxConnections) { writes = 0;
this.poolCfg.maxConnections = 10
// Init configs with options from config if not present
for (var i in config.replication.reads) {
config.replication.reads[i] = Utils._.defaults(config.replication.reads[i], {
host: this.config.host,
port: this.config.port,
user: this.config.username,
password: this.config.password,
database: this.config.database
});
} }
if (!this.poolCfg.minConnections) { for (var j in config.replication.writes) {
this.poolCfg.minConnections = 0 config.replication.writes[j] = Utils._.defaults(config.replication.writes[j], {
host: this.config.host,
port: this.config.port,
user: this.config.username,
password: this.config.password,
database: this.config.database
});
} }
// I'll make my own pool, with blackjack and hookers!
this.pool = {
release: function (client) {
if (client.queryType == 'SELECT') {
return this.read.release(client);
} else {
return this.write.release(client);
}
},
acquire: function (callback, priority, queryType) {
if (queryType == 'SELECT') {
this.read.acquire(callback, priority);
} else {
this.write.acquire(callback, priority);
}
},
drain: function () {
this.read.drain();
this.write.drain();
},
read: Pooling.Pool({
name: 'sequelize-read',
create: function (done) {
if (reads >= self.config.replication.reads.length) reads = 0;
var config = self.config.replication.reads[reads++];
require('util').debug(JSON.stringify(config));
var connection = mysql.createConnection(config)
self.isConnecting = false
done(null, connection)
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
}),
write: Pooling.Pool({
name: 'sequelize-write',
create: function (done) {
if (writes >= self.config.replication.writes.length) writes = 0;
var config = self.config.replication.writes[writes++];
var connection = mysql.createConnection(config)
self.isConnecting = false
done(null, connection)
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
})
};
} else if (this.poolCfg) {
//the user has requested pooling, so create our connection pool
this.pool = Pooling.Pool({ this.pool = Pooling.Pool({
name: 'sequelize-mysql', name: 'sequelize-mysql',
create: function (done) { create: function (done) {
connect.call(self, done) connect.call(self, done)
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
}, },
max: self.poolCfg.maxConnections, max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections, min: self.poolCfg.minConnections,
...@@ -66,7 +149,7 @@ module.exports = (function() { ...@@ -66,7 +149,7 @@ module.exports = (function() {
sql: sql sql: sql
}; };
enqueue.call(this, queueItem); enqueue.call(this, queueItem, options);
return queueItem.query; return queueItem.query;
} }
...@@ -90,10 +173,11 @@ module.exports = (function() { ...@@ -90,10 +173,11 @@ module.exports = (function() {
this.pool.acquire(function(err, client) { this.pool.acquire(function(err, client) {
if (err) return query.emit('error', err); if (err) return query.emit('error', err);
client.queryType = options.type;
query.client = client; query.client = client;
query.run(sql); query.run(sql);
return; return;
}); }, undefined, options.type);
} }
return query; return query;
...@@ -160,7 +244,7 @@ module.exports = (function() { ...@@ -160,7 +244,7 @@ module.exports = (function() {
done(null, connection) done(null, connection)
} }
var enqueue = function(queueItem) { var enqueue = function(queueItem, options) {
if(this.activeQueue.length < this.maxConcurrentQueries) { if(this.activeQueue.length < this.maxConcurrentQueries) {
this.activeQueue.push(queueItem) this.activeQueue.push(queueItem)
if (this.pool) { if (this.pool) {
...@@ -172,11 +256,12 @@ module.exports = (function() { ...@@ -172,11 +256,12 @@ module.exports = (function() {
} }
//we set the client here, asynchronously, when getting a pooled connection //we set the client here, asynchronously, when getting a pooled connection
//allowing the ConnectorManager.query method to remain synchronous //allowing the ConnectorManager.query method to remain synchronous
client.queryType = options.type;
queueItem.query.client = client queueItem.query.client = client
queueItem.client = client queueItem.client = client
execQueueItem.call(self, queueItem) execQueueItem.call(self, queueItem)
return return
}) }, undefined, options.type)
} }
else else
{ {
......
...@@ -38,7 +38,9 @@ module.exports = (function() { ...@@ -38,7 +38,9 @@ module.exports = (function() {
logging: console.log, logging: console.log,
omitNull: false, omitNull: false,
queue: true, queue: true,
native: false native: false,
replication: false,
pool: {}
}, options || {}) }, options || {})
if(this.options.logging === true) { if(this.options.logging === true) {
...@@ -56,6 +58,7 @@ module.exports = (function() { ...@@ -56,6 +58,7 @@ module.exports = (function() {
protocol: this.options.protocol, protocol: this.options.protocol,
queue : this.options.queue, queue : this.options.queue,
native : this.options.native, native : this.options.native,
replication: this.options.replication,
maxConcurrentQueries: this.options.maxConcurrentQueries maxConcurrentQueries: this.options.maxConcurrentQueries
} }
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!