不要怂,就是干,撸起袖子干!

Commit 684848c4 by Sascha Depold

Merge pull request #345 from innofluence/read-replication

Read replication for Mysql (experimental)
2 parents 5c8c1bbe 686742e0
......@@ -13,27 +13,102 @@ module.exports = (function() {
this.queue = []
this.activeQueue = []
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
this.poolCfg = this.config.pool
this.poolCfg = Utils._.defaults(this.config.pool, {
maxConnections: 10,
minConnections: 0,
maxIdleTime: 1000
});
this.pendingQueries = 0;
this.useReplicaton = !!config.replication;
this.useQueue = config.queue !== undefined ? config.queue : true;
var self = this
if (this.poolCfg) {
//the user has requested pooling, so create our connection pool
if (!this.poolCfg.maxConnections) {
this.poolCfg.maxConnections = 10
}
if (!this.poolCfg.minConnections) {
this.poolCfg.minConnections = 0
if (this.useReplicaton) {
var reads = 0,
writes = 0;
// Init configs with options from config if not present
for (var i in config.replication.read) {
config.replication.read[i] = Utils._.defaults(config.replication.read[i], {
host: this.config.host,
port: this.config.port,
username: this.config.username,
password: this.config.password,
database: this.config.database
});
}
config.replication.write = Utils._.defaults(config.replication.write, {
host: this.config.host,
port: this.config.port,
username: this.config.username,
password: this.config.password,
database: this.config.database
});
// I'll make my own pool, with blackjack and hookers!
this.pool = {
release: function (client) {
if (client.queryType == 'read') {
return this.read.release(client);
} else {
return this.write.release(client);
}
},
acquire: function (callback, priority, queryType) {
if (queryType == 'SELECT') {
this.read.acquire(callback, priority);
} else {
this.write.acquire(callback, priority);
}
},
drain: function () {
this.read.drain();
this.write.drain();
},
read: Pooling.Pool({
name: 'sequelize-read',
create: function (done) {
if (reads >= self.config.replication.read.length) reads = 0;
var config = self.config.replication.read[reads++];
connect.call(self, function (err, connection) {
connection.queryType = 'read'
done(null, connection)
}, config);
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
}),
write: Pooling.Pool({
name: 'sequelize-write',
create: function (done) {
connect.call(self, function (err, connection) {
connection.queryType = 'write'
done(null, connection)
}, self.config.replication.write);
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
})
};
} else if (this.poolCfg) {
//the user has requested pooling, so create our connection pool
this.pool = Pooling.Pool({
name: 'sequelize-mysql',
create: function (done) {
connect.call(self, done)
connect.call(self, done)
},
destroy: function(client) {
disconnect.call(self, client)
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
......@@ -66,7 +141,7 @@ module.exports = (function() {
sql: sql
};
enqueue.call(this, queueItem);
enqueue.call(this, queueItem, options);
return queueItem.query;
}
......@@ -93,7 +168,7 @@ module.exports = (function() {
query.client = client;
query.run(sql);
return;
});
}, undefined, options.type);
}
return query;
......@@ -146,13 +221,14 @@ module.exports = (function() {
})
}
var connect = function(done) {
var connect = function(done, config) {
config = config || this.config
var connection = mysql.createConnection({
host: this.config.host,
port: this.config.port,
user: this.config.username,
password: this.config.password,
database: this.config.database
host: config.host,
port: config.port,
user: config.username,
password: config.password,
database: config.database
})
// client.setMaxListeners(self.maxConcurrentQueries)
this.isConnecting = false
......@@ -160,7 +236,8 @@ module.exports = (function() {
done(null, connection)
}
var enqueue = function(queueItem) {
var enqueue = function(queueItem, options) {
options = options || {}
if(this.activeQueue.length < this.maxConcurrentQueries) {
this.activeQueue.push(queueItem)
if (this.pool) {
......@@ -176,7 +253,7 @@ module.exports = (function() {
queueItem.client = client
execQueueItem.call(self, queueItem)
return
})
}, undefined, options.type)
}
else
{
......
......@@ -38,7 +38,9 @@ module.exports = (function() {
logging: console.log,
omitNull: false,
queue: true,
native: false
native: false,
replication: false,
pool: {}
}, options || {})
if(this.options.logging === true) {
......@@ -56,6 +58,7 @@ module.exports = (function() {
protocol: this.options.protocol,
queue : this.options.queue,
native : this.options.native,
replication: this.options.replication,
maxConcurrentQueries: this.options.maxConcurrentQueries
}
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!