不要怂,就是干,撸起袖子干!

Commit 971c1ab0 by Willyham

Support forceWrite option to force query write pool

1 parent 7790c4fa
Showing with 101 additions and 102 deletions
......@@ -12,8 +12,7 @@ var Pooling = require('generic-pool')
, ConnectionManager;
ConnectionManager = function(dialect, sequelize) {
var config = sequelize.config
, self = this;
var config = sequelize.config;
this.sequelize = sequelize;
this.config = config;
......@@ -28,7 +27,7 @@ ConnectionManager = function(dialect, sequelize) {
// If the user has turned off pooling we provide a 0/1 pool for backwards compat
config.pool = _.defaults({
max: 1,
min: 0,
min: 0
}, defaultPoolingConfig, {
validate: this.$validate.bind(this)
});
......@@ -68,123 +67,123 @@ ConnectionManager.prototype.initPools = function () {
var self = this
, config = this.config;
if (config.replication) {
var reads = 0
, writes = 0;
if (!config.replication) {
this.pool = Pooling.Pool({
name: 'sequelize-connection',
create: function(callback) {
self.$connect(config).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
self.$disconnect(connection);
},
max: config.pool.max,
min: config.pool.min,
validate: config.pool.validate,
idleTimeoutMillis: config.pool.idle
});
return;
}
if (!Array.isArray(config.replication.read)) {
config.replication.read = [config.replication.read];
}
var reads = 0;
// Make sure we don't modify the existing config object (user might re-use it)
config.replication.write = _.extend({}, config.replication.write);
config.replication.read = config.replication.read.map(function (read) {
return _.extend({}, read);
});
if (!Array.isArray(config.replication.read)) {
config.replication.read = [config.replication.read];
}
// Make sure we don't modify the existing config object (user might re-use it)
config.replication.write = _.clone(config.replication.write);
config.replication.read = config.replication.read.map(function (read) {
return _.clone(read);
});
// Map main connection config
config.replication.write = _.defaults(config.replication.write, {
host: config.host,
port: config.port,
username: config.username,
password: config.password,
database: config.database
});
// Map main connection config
config.replication.write = _.defaults(config.replication.write, {
host: config.host,
port: config.port,
username: config.username,
password: config.password,
database: config.database
// Apply defaults to each read config
config.replication.read = _.map(config.replication.read, function(config) {
return _.defaults(config, {
host: self.config.host,
port: self.config.port,
username: self.config.username,
password: self.config.password,
database: self.config.database
});
});
for (var i = 0; i < config.replication.read.length; i++) {
config.replication.read[i] = _.defaults(config.replication.read[i], {
host: this.config.host,
port: this.config.port,
username: this.config.username,
password: this.config.password,
database: this.config.database
// I'll make my own pool, with blackjack and hookers! (original credit goes to @janzeh)
this.pool = {
release: function(client) {
if (client.queryType === 'read') {
return self.pool.read.release(client);
} else {
return self.pool.write.release(client);
}
},
acquire: function(callback, priority, queryType, forceWrite) {
forceWrite = _.isUndefined(forceWrite) ? false : forceWrite;
if (queryType === 'SELECT' && !forceWrite) {
self.pool.read.acquire(callback, priority);
} else {
self.pool.write.acquire(callback, priority);
}
},
destroy: function(connection) {
return self.pool[connection.queryType].destroy(connection);
},
destroyAllNow: function() {
self.pool.read.destroyAllNow();
self.pool.write.destroyAllNow();
},
drain: function(cb) {
self.pool.write.drain(function() {
self.pool.read.drain(cb);
});
}
// I'll make my own pool, with blackjack and hookers! (original credit goes to @janzeh)
this.pool = {
release: function(client) {
if (client.queryType === 'read') {
return self.pool.read.release(client);
} else {
return self.pool.write.release(client);
}
},
acquire: function(callback, priority, queryType) {
if (queryType === 'SELECT') {
self.pool.read.acquire(callback, priority);
} else {
self.pool.write.acquire(callback, priority);
}
},
read: Pooling.Pool({
name: 'sequelize-connection-read',
create: function(callback) {
// Simple round robin config
var nextRead = reads++ % config.replication.read.length;
self.$connect(config.replication.read[nextRead]).tap(function (connection) {
connection.queryType = 'read';
}).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
return self.pool[connection.queryType].destroy(connection);
},
destroyAllNow: function() {
self.pool.read.destroyAllNow();
self.pool.write.destroyAllNow();
},
drain: function(cb) {
self.pool.write.drain(function() {
self.pool.read.drain(cb);
});
self.$disconnect(connection);
},
read: Pooling.Pool({
name: 'sequelize-connection-read',
create: function(callback) {
if (reads >= config.replication.read.length) {
reads = 0;
}
// Simple round robin config
self.$connect(config.replication.read[reads++]).tap(function (connection) {
connection.queryType = 'read';
}).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
self.$disconnect(connection);
},
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
idleTimeoutMillis: config.pool.idle
}),
write: Pooling.Pool({
name: 'sequelize-connection-write',
create: function(callback) {
self.$connect(config.replication.write).tap(function (connection) {
connection.queryType = 'write';
}).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
self.$disconnect(connection);
},
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
idleTimeoutMillis: config.pool.idle
})
};
} else {
this.pool = Pooling.Pool({
name: 'sequelize-connection',
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
idleTimeoutMillis: config.pool.idle
}),
write: Pooling.Pool({
name: 'sequelize-connection-write',
create: function(callback) {
self.$connect(config).nodeify(function (err, connection) {
self.$connect(config.replication.write).tap(function (connection) {
connection.queryType = 'write';
}).nodeify(function (err, connection) {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
});
},
destroy: function(connection) {
self.$disconnect(connection);
},
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
validate: config.pool.validate,
idleTimeoutMillis: config.pool.idle
});
}
})
};
};
ConnectionManager.prototype.getConnection = function(options) {
......@@ -195,7 +194,7 @@ ConnectionManager.prototype.getConnection = function(options) {
self.pool.acquire(function(err, connection) {
if (err) return reject(err);
resolve(connection);
}, options.priority, options.type);
}, options.priority, options.type, options.forceWrite);
});
};
ConnectionManager.prototype.releaseConnection = function(connection) {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!