不要怂,就是干,撸起袖子干!

Commit e8cbc7de by Dmitry Chornyi

Refactored pool.destroy

1 parent 63ac4704
Showing with 440 additions and 443 deletions
var mysql var mysql
, Pooling = require('generic-pool') , Pooling = require('generic-pool')
, Query = require("./query") , Query = require("./query")
, Utils = require("../../utils") , Utils = require("../../utils")
, without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) } , without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
module.exports = (function() { module.exports = (function() {
var ConnectorManager = function(sequelize, config) { var ConnectorManager = function(sequelize, config) {
try { try {
if (config.dialectModulePath) { if (config.dialectModulePath) {
mysql = require(config.dialectModulePath) mysql = require(config.dialectModulePath)
} else { } else {
mysql = require('mysql') mysql = require('mysql')
} }
} catch (err) { } catch (err) {
console.log('You need to install mysql package manually') console.log('You need to install mysql package manually')
} }
this.sequelize = sequelize this.sequelize = sequelize
this.client = null this.client = null
this.config = config || {} this.config = config || {}
this.config.port = this.config.port || 3306 this.config.port = this.config.port || 3306
this.disconnectTimeoutId = null this.disconnectTimeoutId = null
this.queue = [] this.queue = []
this.activeQueue = [] this.activeQueue = []
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50) this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
this.poolCfg = Utils._.defaults(this.config.pool, { this.poolCfg = Utils._.defaults(this.config.pool, {
maxConnections: 10, maxConnections: 10,
minConnections: 0, minConnections: 0,
maxIdleTime: 1000, maxIdleTime: 1000,
handleDisconnects: false, handleDisconnects: false,
validate: validateConnection validate: validateConnection
}); });
this.pendingQueries = 0; this.pendingQueries = 0;
this.useReplicaton = !!config.replication; this.useReplicaton = !!config.replication;
this.useQueue = config.queue !== undefined ? config.queue : true; this.useQueue = config.queue !== undefined ? config.queue : true;
var self = this var self = this
if (this.useReplicaton) { if (this.useReplicaton) {
var reads = 0 var reads = 0
, writes = 0; , writes = 0;
// Init configs with options from config if not present // Init configs with options from config if not present
for (var i in config.replication.read) { for (var i in config.replication.read) {
config.replication.read[i] = Utils._.defaults(config.replication.read[i], { config.replication.read[i] = Utils._.defaults(config.replication.read[i], {
host: this.config.host, host: this.config.host,
port: this.config.port, port: this.config.port,
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
database: this.config.database database: this.config.database
}); });
} }
config.replication.write = Utils._.defaults(config.replication.write, { config.replication.write = Utils._.defaults(config.replication.write, {
host: this.config.host, host: this.config.host,
port: this.config.port, port: this.config.port,
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
database: this.config.database database: this.config.database
}); });
// I'll make my own pool, with blackjack and hookers! // I'll make my own pool, with blackjack and hookers!
this.pool = { this.pool = {
release: function (client) { release: function (client) {
if (client.queryType == 'read') { if (client.queryType == 'read') {
return this.read.release(client); return this.read.release(client);
} else { } else {
return this.write.release(client); return this.write.release(client);
} }
}, },
acquire: function (callback, priority, queryType) { destroy: function (client) {
if (queryType == 'SELECT') { if (client.queryType == 'read') {
this.read.acquire(callback, priority); return this.read.destroy(client);
} else { } else {
this.write.acquire(callback, priority); return this.write.destroy(client);
} }
}, },
drain: function () { acquire: function (callback, priority, queryType) {
this.read.drain(); if (queryType == 'SELECT') {
this.write.drain(); this.read.acquire(callback, priority);
}, } else {
read: Pooling.Pool({ this.write.acquire(callback, priority);
name: 'sequelize-read', }
create: function (done) { },
if (reads >= self.config.replication.read.length) { drain: function () {
reads = 0 this.read.drain();
} this.write.drain();
var config = self.config.replication.read[reads++]; },
read: Pooling.Pool({
connect.call(self, function (err, connection) { name: 'sequelize-read',
if (connection) { create: function (done) {
connection.queryType = 'read' if (reads >= self.config.replication.read.length) {
} reads = 0
}
done(err, connection) var config = self.config.replication.read[reads++];
}, config);
}, connect.call(self, function (err, connection) {
destroy: function(client) { if (connection) {
disconnect.call(self, client) connection.queryType = 'read'
}, }
validate: self.poolCfg.validate,
max: self.poolCfg.maxConnections, done(err, connection)
min: self.poolCfg.minConnections, }, config);
idleTimeoutMillis: self.poolCfg.maxIdleTime },
}), destroy: function(client) {
write: Pooling.Pool({ disconnect.call(self, client)
name: 'sequelize-write', },
create: function (done) { validate: self.poolCfg.validate,
connect.call(self, function (err, connection) { max: self.poolCfg.maxConnections,
if (connection) { min: self.poolCfg.minConnections,
connection.queryType = 'write' idleTimeoutMillis: self.poolCfg.maxIdleTime
} }),
write: Pooling.Pool({
done(err, connection) name: 'sequelize-write',
}, self.config.replication.write); create: function (done) {
}, connect.call(self, function (err, connection) {
destroy: function(client) { if (connection) {
disconnect.call(self, client) connection.queryType = 'write'
}, }
validate: self.poolCfg.validate,
max: self.poolCfg.maxConnections, done(err, connection)
min: self.poolCfg.minConnections, }, self.config.replication.write);
idleTimeoutMillis: self.poolCfg.maxIdleTime },
}) destroy: function(client) {
}; disconnect.call(self, client)
} else if (this.poolCfg) { },
//the user has requested pooling, so create our connection pool validate: self.poolCfg.validate,
this.pool = Pooling.Pool({ max: self.poolCfg.maxConnections,
name: 'sequelize-mysql', min: self.poolCfg.minConnections,
create: function (done) { idleTimeoutMillis: self.poolCfg.maxIdleTime
connect.call(self, function (err, connection) { })
// This has to be nested for some reason, else the error won't propagate correctly };
done(err, connection); } else if (this.poolCfg) {
}) //the user has requested pooling, so create our connection pool
}, this.pool = Pooling.Pool({
destroy: function(client) { name: 'sequelize-mysql',
disconnect.call(self, client) create: function (done) {
}, connect.call(self, function (err, connection) {
max: self.poolCfg.maxConnections, // This has to be nested for some reason, else the error won't propagate correctly
min: self.poolCfg.minConnections, done(err, connection);
validate: self.poolCfg.validate, })
idleTimeoutMillis: self.poolCfg.maxIdleTime },
}) destroy: function(client) {
} disconnect.call(self, client)
},
this.onProcessExit = function () { max: self.poolCfg.maxConnections,
//be nice & close our connections on exit min: self.poolCfg.minConnections,
if (self.pool) { validate: self.poolCfg.validate,
self.pool.drain() idleTimeoutMillis: self.poolCfg.maxIdleTime
} else if (self.client) { })
disconnect(self.client) }
}
this.onProcessExit = function () {
return //be nice & close our connections on exit
}.bind(this); if (self.pool) {
self.pool.drain()
process.on('exit', this.onProcessExit) } else if (self.client) {
} disconnect(self.client)
}
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
return
ConnectorManager.prototype.query = function(sql, callee, options) { }.bind(this);
if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting process.on('exit', this.onProcessExit)
var queueItem = { }
query: new Query(null, this.sequelize, callee, options || {}),
sql: sql Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
};
ConnectorManager.prototype.query = function(sql, callee, options) {
queueItem.query.options.uuid = this.config.uuid if (this.useQueue) {
enqueue.call(this, queueItem, options); // If queueing we'll let the execQueueItem method handle connecting
return queueItem.query; var queueItem = {
} query: new Query(null, this.sequelize, callee, options || {}),
sql: sql
var self = this, query = new Query(null, this.sequelize, callee, options || {}); };
this.pendingQueries++;
queueItem.query.options.uuid = this.config.uuid
query.options.uuid = this.config.uuid enqueue.call(this, queueItem, options);
query.done(function() { return queueItem.query;
self.pendingQueries--; }
if (self.pool) {
self.pool.release(query.client); var self = this, query = new Query(null, this.sequelize, callee, options || {});
} else { this.pendingQueries++;
if (self.pendingQueries === 0) {
setTimeout(function() { query.options.uuid = this.config.uuid
if (self.pendingQueries === 0){ query.done(function() {
self.disconnect.call(self); self.pendingQueries--;
} if (self.pool) {
}, 100); self.pool.release(query.client);
} } else {
} if (self.pendingQueries === 0) {
}); setTimeout(function() {
if (self.pendingQueries === 0){
this.getConnection(options, function (err, client) { self.disconnect.call(self);
if (err) { }
return query.emit('error', err) }, 100);
} }
query.client = client }
query.run(sql) });
});
this.getConnection(options, function (err, client) {
return query; if (err) {
}; return query.emit('error', err)
}
ConnectorManager.prototype.getConnection = function(options, callback) { query.client = client
var self = this; query.run(sql)
});
if (typeof options === "function") {
callback = options; return query;
options = {}; };
}
ConnectorManager.prototype.getConnection = function(options, callback) {
return new Utils.CustomEventEmitter(function (emitter) { var self = this;
if (!self.pool) {
// Regular client caching if (typeof options === "function") {
if (self.client) { callback = options;
return emitter.emit('success', self.client); options = {};
} else { }
// Cache for concurrent queries
if (self._getConnection) { return new Utils.CustomEventEmitter(function (emitter) {
self._getConnection.proxy(emitter); if (!self.pool) {
return; // Regular client caching
} if (self.client) {
return emitter.emit('success', self.client);
// Set cache and acquire connection } else {
self._getConnection = emitter; // Cache for concurrent queries
connect.call(self, function(err, client) { if (self._getConnection) {
if (err) { self._getConnection.proxy(emitter);
return emitter.emit('error', err); return;
} }
// Unset caching, should now be caught by the self.client check // Set cache and acquire connection
self._getConnection = null; self._getConnection = emitter;
self.client = client; connect.call(self, function(err, client) {
emitter.emit('success', client); if (err) {
}); return emitter.emit('error', err);
} }
}
if (self.pool) { // Unset caching, should now be caught by the self.client check
// Acquire from pool self._getConnection = null;
self.pool.acquire(function(err, client) { self.client = client;
if (err) { emitter.emit('success', client);
return emitter.emit('error', err); });
} }
emitter.emit('success', client); }
}, options.priority, options.type); if (self.pool) {
} // Acquire from pool
}).run().done(callback); self.pool.acquire(function(err, client) {
}; if (err) {
return emitter.emit('error', err);
ConnectorManager.prototype.disconnect = function() { }
if (this.client) { emitter.emit('success', client);
disconnect.call(this, this.client) }, options.priority, options.type);
} }
return }).run().done(callback);
}; };
ConnectorManager.prototype.disconnect = function() {
// private if (this.client) {
disconnect.call(this, this.client)
var disconnect = function(client) { }
var self = this; return
this.client = null; };
client.end(function() {
if (!self.useQueue) { // private
return client.destroy();
} var disconnect = function(client) {
var self = this;
var intervalObj = null this.client = null;
var cleanup = function () {
var retryCt = 0 client.end(function() {
// make sure to let client finish before calling destroy if (!self.useQueue) {
if (client._queue && (client._queue.length > 0)) { return client.destroy();
return }
}
// needed to prevent mysql connection leak var intervalObj = null
client.destroy() var cleanup = function () {
clearInterval(intervalObj) var retryCt = 0
} // make sure to let client finish before calling destroy
intervalObj = setInterval(cleanup, 10) if (client._queue && (client._queue.length > 0)) {
cleanup() return
return }
}) // needed to prevent mysql connection leak
} client.destroy()
clearInterval(intervalObj)
var connect = function(done, config) { }
config = config || this.config intervalObj = setInterval(cleanup, 10)
cleanup()
var emitter = new (require('events').EventEmitter)() return
var connectionConfig = { })
host: config.host, }
port: config.port,
user: config.username, var connect = function(done, config) {
password: config.password, config = config || this.config
database: config.database,
timezone: 'Z' var emitter = new (require('events').EventEmitter)()
}; var connectionConfig = {
host: config.host,
if (config.dialectOptions) { port: config.port,
Object.keys(config.dialectOptions).forEach(function (key) { user: config.username,
connectionConfig[key] = config.dialectOptions[key]; password: config.password,
}); database: config.database,
} timezone: 'Z'
};
var connection = mysql.createConnection(connectionConfig);
connection.connect(function(err) { if (config.dialectOptions) {
if (err) { Object.keys(config.dialectOptions).forEach(function (key) {
switch(err.code) { connectionConfig[key] = config.dialectOptions[key];
case 'ECONNREFUSED': });
case 'ER_ACCESS_D2ENIED_ERROR': }
emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.')
break var connection = mysql.createConnection(connectionConfig);
case 'ENOTFOUND': connection.connect(function(err) {
case 'EHOSTUNREACH': if (err) {
case 'EINVAL': switch(err.code) {
emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.') case 'ECONNREFUSED':
break case 'ER_ACCESS_D2ENIED_ERROR':
default: emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.')
emitter.emit('error', err); break
break; case 'ENOTFOUND':
} case 'EHOSTUNREACH':
case 'EINVAL':
return; emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.')
} break
default:
emitter.emit('success', connection); emitter.emit('error', err);
}) break;
}
connection.query("SET time_zone = '+0:00'");
// client.setMaxListeners(self.maxConcurrentQueries) return;
this.isConnecting = false }
if (config.pool !== null && config.pool.handleDisconnects) {
handleDisconnect(this.pool, connection) emitter.emit('success', connection);
} })
emitter.on('error', function (err) { connection.query("SET time_zone = '+0:00'");
done(err); // client.setMaxListeners(self.maxConcurrentQueries)
}); this.isConnecting = false
emitter.on('success', function (connection) { if (config.pool !== null && config.pool.handleDisconnects) {
done(null, connection); handleDisconnect(this.pool, connection)
}); }
}
emitter.on('error', function (err) {
var handleDisconnect = function(pool, client) { done(err);
client.on('error', function(err) { });
if (err.code !== 'PROTOCOL_CONNECTION_LOST') { emitter.on('success', function (connection) {
throw err done(null, connection);
} });
}
if (typeof(pool.destroy) == 'function') {
pool.destroy(client) var handleDisconnect = function(pool, client) {
} else { client.on('error', function(err) {
if (client.queryType == 'read' && pool.read && typeof(pool.read.destroy) == 'function') { if (err.code !== 'PROTOCOL_CONNECTION_LOST') {
pool.read.destroy(client) throw err
} }
if (client.queryType == 'write' && pool.write && typeof(pool.write.destroy) == 'function') { pool.destroy(client)
pool.write.destroy(client) })
} }
}
}) var validateConnection = function(client) {
} return client && client.state !== 'disconnected'
}
var validateConnection = function(client) {
return client && client.state !== 'disconnected' var enqueue = function(queueItem, options) {
} options = options || {}
if (this.activeQueue.length < this.maxConcurrentQueries) {
var enqueue = function(queueItem, options) { this.activeQueue.push(queueItem)
options = options || {} execQueueItem.call(this, queueItem)
if (this.activeQueue.length < this.maxConcurrentQueries) { } else {
this.activeQueue.push(queueItem) this.queue.push(queueItem)
execQueueItem.call(this, queueItem) }
} else { }
this.queue.push(queueItem)
} var dequeue = function(queueItem) {
} //return the item's connection to the pool
if (this.pool) {
var dequeue = function(queueItem) { this.pool.release(queueItem.client)
//return the item's connection to the pool }
if (this.pool) { this.activeQueue = without(this.activeQueue, queueItem)
this.pool.release(queueItem.client) }
}
this.activeQueue = without(this.activeQueue, queueItem) var transferQueuedItems = function(count) {
} for(var i = 0; i < count; i++) {
var queueItem = this.queue.shift();
var transferQueuedItems = function(count) { if (queueItem) {
for(var i = 0; i < count; i++) { enqueue.call(this, queueItem)
var queueItem = this.queue.shift(); }
if (queueItem) { }
enqueue.call(this, queueItem) }
}
} var afterQuery = function(queueItem) {
} dequeue.call(this, queueItem)
transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
var afterQuery = function(queueItem) { disconnectIfNoConnections.call(this)
dequeue.call(this, queueItem) }
transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
disconnectIfNoConnections.call(this)
} var execQueueItem = function(queueItem) {
var self = this
var execQueueItem = function(queueItem) { self.getConnection({
var self = this priority: queueItem.query.options.priority,
type: queueItem.query.options.type
self.getConnection({ }, function (err, connection) {
priority: queueItem.query.options.priority, if (err) {
type: queueItem.query.options.type queueItem.query.emit('error', err)
}, function (err, connection) { return
if (err) { }
queueItem.query.emit('error', err)
return queueItem.query.client = connection
} queueItem.client = connection
queueItem.query
queueItem.query.client = connection .success(function(){ afterQuery.call(self, queueItem) })
queueItem.client = connection .error(function(){ afterQuery.call(self, queueItem) })
queueItem.query
.success(function(){ afterQuery.call(self, queueItem) }) queueItem.query.run(queueItem.sql, queueItem.client)
.error(function(){ afterQuery.call(self, queueItem) }) })
}
queueItem.query.run(queueItem.sql, queueItem.client)
}) ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
} return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0))
})
ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0)) // legacy
}) ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() {
return !this.hasQueuedItems
// legacy })
ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() {
return !this.hasQueuedItems ConnectorManager.prototype.__defineGetter__('isConnected', function() {
}) return this.client != null
})
ConnectorManager.prototype.__defineGetter__('isConnected', function() {
return this.client != null var disconnectIfNoConnections = function() {
}) var self = this
var disconnectIfNoConnections = function() { this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId)
var self = this this.disconnectTimeoutId = setTimeout(function() {
self.isConnected && !self.hasQueuedItems && self.disconnect()
this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId) }, 100)
this.disconnectTimeoutId = setTimeout(function() { }
self.isConnected && !self.hasQueuedItems && self.disconnect()
}, 100) return ConnectorManager
}
return ConnectorManager
})() })()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!