不要怂,就是干,撸起袖子干!

Commit 7d5e5210 by Dmitry Chornyi

Fixed whitespace

1 parent e8cbc7de
Showing with 444 additions and 440 deletions
var mysql var mysql
, Pooling = require('generic-pool') , Pooling = require('generic-pool')
, Query = require("./query") , Query = require("./query")
, Utils = require("../../utils") , Utils = require("../../utils")
, without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) } , without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
module.exports = (function() { module.exports = (function() {
var ConnectorManager = function(sequelize, config) { var ConnectorManager = function(sequelize, config) {
try { try {
if (config.dialectModulePath) { if (config.dialectModulePath) {
mysql = require(config.dialectModulePath) mysql = require(config.dialectModulePath)
} else { } else {
mysql = require('mysql') mysql = require('mysql')
} }
} catch (err) { } catch (err) {
console.log('You need to install mysql package manually') console.log('You need to install mysql package manually')
} }
this.sequelize = sequelize this.sequelize = sequelize
this.client = null this.client = null
this.config = config || {} this.config = config || {}
this.config.port = this.config.port || 3306 this.config.port = this.config.port || 3306
this.disconnectTimeoutId = null this.disconnectTimeoutId = null
this.queue = [] this.queue = []
this.activeQueue = [] this.activeQueue = []
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50) this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
this.poolCfg = Utils._.defaults(this.config.pool, { this.poolCfg = Utils._.defaults(this.config.pool, {
maxConnections: 10, maxConnections: 10,
minConnections: 0, minConnections: 0,
maxIdleTime: 1000, maxIdleTime: 1000,
handleDisconnects: false, handleDisconnects: false,
validate: validateConnection validate: validateConnection
}); });
this.pendingQueries = 0; this.pendingQueries = 0;
this.useReplicaton = !!config.replication; this.useReplicaton = !!config.replication;
this.useQueue = config.queue !== undefined ? config.queue : true; this.useQueue = config.queue !== undefined ? config.queue : true;
var self = this var self = this
if (this.useReplicaton) { if (this.useReplicaton) {
var reads = 0 var reads = 0
, writes = 0; , writes = 0;
// Init configs with options from config if not present // Init configs with options from config if not present
for (var i in config.replication.read) { for (var i in config.replication.read) {
config.replication.read[i] = Utils._.defaults(config.replication.read[i], { config.replication.read[i] = Utils._.defaults(config.replication.read[i], {
host: this.config.host, host: this.config.host,
port: this.config.port, port: this.config.port,
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
database: this.config.database database: this.config.database
}); });
} }
config.replication.write = Utils._.defaults(config.replication.write, { config.replication.write = Utils._.defaults(config.replication.write, {
host: this.config.host, host: this.config.host,
port: this.config.port, port: this.config.port,
username: this.config.username, username: this.config.username,
password: this.config.password, password: this.config.password,
database: this.config.database database: this.config.database
}); });
// I'll make my own pool, with blackjack and hookers! // I'll make my own pool, with blackjack and hookers!
this.pool = { this.pool = {
release: function (client) { release: function (client) {
if (client.queryType == 'read') { if (client.queryType == 'read') {
return this.read.release(client); return this.read.release(client);
} else { } else {
return this.write.release(client); return this.write.release(client);
} }
}, },
destroy: function (client) { destroy: function (client) {
if (client.queryType == 'read') { if (client.queryType == 'read') {
return this.read.destroy(client); return this.read.destroy(client);
} else { } else {
return this.write.destroy(client); return this.write.destroy(client);
} }
}, },
acquire: function (callback, priority, queryType) { acquire: function (callback, priority, queryType) {
if (queryType == 'SELECT') { if (queryType == 'SELECT') {
this.read.acquire(callback, priority); this.read.acquire(callback, priority);
} else { } else {
this.write.acquire(callback, priority); this.write.acquire(callback, priority);
} }
}, },
drain: function () { drain: function () {
this.read.drain(); this.read.drain();
this.write.drain(); this.write.drain();
}, },
read: Pooling.Pool({ read: Pooling.Pool({
name: 'sequelize-read', name: 'sequelize-read',
create: function (done) { create: function (done) {
if (reads >= self.config.replication.read.length) { if (reads >= self.config.replication.read.length) {
reads = 0 reads = 0
} }
var config = self.config.replication.read[reads++]; var config = self.config.replication.read[reads++];
connect.call(self, function (err, connection) { connect.call(self, function (err, connection) {
if (connection) { if (connection) {
connection.queryType = 'read' connection.queryType = 'read'
} }
done(err, connection) done(err, connection)
}, config); }, config);
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
}, },
validate: self.poolCfg.validate, validate: self.poolCfg.validate,
max: self.poolCfg.maxConnections, max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections, min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime idleTimeoutMillis: self.poolCfg.maxIdleTime
}), }),
write: Pooling.Pool({ write: Pooling.Pool({
name: 'sequelize-write', name: 'sequelize-write',
create: function (done) { create: function (done) {
connect.call(self, function (err, connection) { connect.call(self, function (err, connection) {
if (connection) { if (connection) {
connection.queryType = 'write' connection.queryType = 'write'
} }
done(err, connection) done(err, connection)
}, self.config.replication.write); }, self.config.replication.write);
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
}, },
validate: self.poolCfg.validate, validate: self.poolCfg.validate,
max: self.poolCfg.maxConnections, max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections, min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime idleTimeoutMillis: self.poolCfg.maxIdleTime
}) })
}; };
} else if (this.poolCfg) { } else if (this.poolCfg) {
//the user has requested pooling, so create our connection pool //the user has requested pooling, so create our connection pool
this.pool = Pooling.Pool({ this.pool = Pooling.Pool({
name: 'sequelize-mysql', name: 'sequelize-mysql',
create: function (done) { create: function (done) {
connect.call(self, function (err, connection) { connect.call(self, function (err, connection) {
// This has to be nested for some reason, else the error won't propagate correctly // This has to be nested for some reason, else the error won't propagate correctly
done(err, connection); done(err, connection);
}) })
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
}, },
max: self.poolCfg.maxConnections, max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections, min: self.poolCfg.minConnections,
validate: self.poolCfg.validate, validate: self.poolCfg.validate,
idleTimeoutMillis: self.poolCfg.maxIdleTime idleTimeoutMillis: self.poolCfg.maxIdleTime
}) })
} }
this.onProcessExit = function () { this.onProcessExit = function () {
//be nice & close our connections on exit //be nice & close our connections on exit
if (self.pool) { if (self.pool) {
self.pool.drain() self.pool.drain()
} else if (self.client) { } else if (self.client) {
disconnect(self.client) disconnect(self.client)
} }
return return
}.bind(this); }.bind(this);
process.on('exit', this.onProcessExit) process.on('exit', this.onProcessExit)
} }
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype); Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
ConnectorManager.prototype.query = function(sql, callee, options) { ConnectorManager.prototype.query = function(sql, callee, options) {
if (this.useQueue) { if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting // If queueing we'll let the execQueueItem method handle connecting
var queueItem = { var queueItem = {
query: new Query(null, this.sequelize, callee, options || {}), query: new Query(null, this.sequelize, callee, options || {}),
sql: sql sql: sql
}; };
queueItem.query.options.uuid = this.config.uuid queueItem.query.options.uuid = this.config.uuid
enqueue.call(this, queueItem, options); enqueue.call(this, queueItem, options);
return queueItem.query; return queueItem.query;
} }
var self = this, query = new Query(null, this.sequelize, callee, options || {}); var self = this, query = new Query(null, this.sequelize, callee, options || {});
this.pendingQueries++; this.pendingQueries++;
query.options.uuid = this.config.uuid query.options.uuid = this.config.uuid
query.done(function() { query.done(function() {
self.pendingQueries--; self.pendingQueries--;
if (self.pool) { if (self.pool) {
self.pool.release(query.client); self.pool.release(query.client);
} else { } else {
if (self.pendingQueries === 0) { if (self.pendingQueries === 0) {
setTimeout(function() { setTimeout(function() {
if (self.pendingQueries === 0){ if (self.pendingQueries === 0){
self.disconnect.call(self); self.disconnect.call(self);
} }
}, 100); }, 100);
} }
} }
}); });
this.getConnection(options, function (err, client) { this.getConnection(options, function (err, client) {
if (err) { if (err) {
return query.emit('error', err) return query.emit('error', err)
} }
query.client = client query.client = client
query.run(sql) query.run(sql)
}); });
return query; return query;
}; };
ConnectorManager.prototype.getConnection = function(options, callback) { ConnectorManager.prototype.getConnection = function(options, callback) {
var self = this; var self = this;
if (typeof options === "function") { if (typeof options === "function") {
callback = options; callback = options;
options = {}; options = {};
} }
return new Utils.CustomEventEmitter(function (emitter) { return new Utils.CustomEventEmitter(function (emitter) {
if (!self.pool) { if (!self.pool) {
// Regular client caching // Regular client caching
if (self.client) { if (self.client) {
return emitter.emit('success', self.client); return emitter.emit('success', self.client);
} else { } else {
// Cache for concurrent queries // Cache for concurrent queries
if (self._getConnection) { if (self._getConnection) {
self._getConnection.proxy(emitter); self._getConnection.proxy(emitter);
return; return;
} }
// Set cache and acquire connection // Set cache and acquire connection
self._getConnection = emitter; self._getConnection = emitter;
connect.call(self, function(err, client) { connect.call(self, function(err, client) {
if (err) { if (err) {
return emitter.emit('error', err); return emitter.emit('error', err);
} }
// Unset caching, should now be caught by the self.client check // Unset caching, should now be caught by the self.client check
self._getConnection = null; self._getConnection = null;
self.client = client; self.client = client;
emitter.emit('success', client); emitter.emit('success', client);
}); });
} }
} }
if (self.pool) { if (self.pool) {
// Acquire from pool // Acquire from pool
self.pool.acquire(function(err, client) { self.pool.acquire(function(err, client) {
if (err) { if (err) {
return emitter.emit('error', err); return emitter.emit('error', err);
} }
emitter.emit('success', client); emitter.emit('success', client);
}, options.priority, options.type); }, options.priority, options.type);
} }
}).run().done(callback); }).run().done(callback);
}; };
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
if (this.client) { if (this.client) {
disconnect.call(this, this.client) disconnect.call(this, this.client)
} }
return return
}; };
// private // private
var disconnect = function(client) { var disconnect = function(client) {
var self = this; var self = this;
this.client = null; this.client = null;
client.end(function() { client.end(function() {
if (!self.useQueue) { if (!self.useQueue) {
return client.destroy(); return client.destroy();
} }
var intervalObj = null var intervalObj = null
var cleanup = function () { var cleanup = function () {
var retryCt = 0 var retryCt = 0
// make sure to let client finish before calling destroy // make sure to let client finish before calling destroy
if (client._queue && (client._queue.length > 0)) { if (client._queue && (client._queue.length > 0)) {
return return
} }
// needed to prevent mysql connection leak // needed to prevent mysql connection leak
client.destroy() client.destroy()
clearInterval(intervalObj) clearInterval(intervalObj)
} }
intervalObj = setInterval(cleanup, 10) intervalObj = setInterval(cleanup, 10)
cleanup() cleanup()
return return
}) })
} }
var connect = function(done, config) { var connect = function(done, config) {
config = config || this.config config = config || this.config
var emitter = new (require('events').EventEmitter)() var emitter = new (require('events').EventEmitter)()
var connectionConfig = { var connectionConfig = {
host: config.host, host: config.host,
port: config.port, port: config.port,
user: config.username, user: config.username,
password: config.password, password: config.password,
database: config.database, database: config.database,
timezone: 'Z' timezone: 'Z'
}; };
if (config.dialectOptions) { if (config.dialectOptions) {
Object.keys(config.dialectOptions).forEach(function (key) { Object.keys(config.dialectOptions).forEach(function (key) {
connectionConfig[key] = config.dialectOptions[key]; connectionConfig[key] = config.dialectOptions[key];
}); });
} }
var connection = mysql.createConnection(connectionConfig); var connection = mysql.createConnection(connectionConfig);
connection.connect(function(err) { connection.connect(function(err) {
if (err) { if (err) {
switch(err.code) { switch(err.code) {
case 'ECONNREFUSED': case 'ECONNREFUSED':
case 'ER_ACCESS_D2ENIED_ERROR': case 'ER_ACCESS_D2ENIED_ERROR':
emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.') emitter.emit('error', 'Failed to authenticate for MySQL. Please double check your settings.')
break break
case 'ENOTFOUND': case 'ENOTFOUND':
case 'EHOSTUNREACH': case 'EHOSTUNREACH':
case 'EINVAL': case 'EINVAL':
emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.') emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.')
break break
default: default:
emitter.emit('error', err); emitter.emit('error', err);
break; break;
} }
return; return;
} }
emitter.emit('success', connection); emitter.emit('success', connection);
}) })
connection.query("SET time_zone = '+0:00'"); connection.query("SET time_zone = '+0:00'");
// client.setMaxListeners(self.maxConcurrentQueries) // client.setMaxListeners(self.maxConcurrentQueries)
this.isConnecting = false this.isConnecting = false
if (config.pool !== null && config.pool.handleDisconnects) { if (config.pool !== null && config.pool.handleDisconnects) {
handleDisconnect(this.pool, connection) handleDisconnect(this.pool, connection)
} }
emitter.on('error', function (err) { emitter.on('error', function (err) {
done(err); done(err);
}); });
emitter.on('success', function (connection) { emitter.on('success', function (connection) {
done(null, connection); done(null, connection);
}); });
} }
var handleDisconnect = function(pool, client) { var handleDisconnect = function(pool, client) {
client.on('error', function(err) { client.on('error', function(err) {
if (err.code !== 'PROTOCOL_CONNECTION_LOST') { if (err.code !== 'PROTOCOL_CONNECTION_LOST') {
throw err throw err
} }
pool.destroy(client) pool.destroy(client)
}) })
} }
var validateConnection = function(client) { var validateConnection = function(client) {
return client && client.state !== 'disconnected' return client && client.state !== 'disconnected'
} }
var enqueue = function(queueItem, options) { var enqueue = function(queueItem, options) {
options = options || {} options = options || {}
if (this.activeQueue.length < this.maxConcurrentQueries) { if (this.activeQueue.length < this.maxConcurrentQueries) {
this.activeQueue.push(queueItem) this.activeQueue.push(queueItem)
execQueueItem.call(this, queueItem) execQueueItem.call(this, queueItem)
} else { } else {
this.queue.push(queueItem) this.queue.push(queueItem)
} }
} }
var dequeue = function(queueItem) { var dequeue = function(queueItem) {
//return the item's connection to the pool //return the item's connection to the pool
if (this.pool) { if (this.pool) {
this.pool.release(queueItem.client) this.pool.release(queueItem.client)
} }
this.activeQueue = without(this.activeQueue, queueItem) this.activeQueue = without(this.activeQueue, queueItem)
} }
var transferQueuedItems = function(count) { var transferQueuedItems = function(count) {
for(var i = 0; i < count; i++) { for(var i = 0; i < count; i++) {
var queueItem = this.queue.shift(); var queueItem = this.queue.shift();
if (queueItem) { if (queueItem) {
enqueue.call(this, queueItem) enqueue.call(this, queueItem)
} }
} }
} }
var afterQuery = function(queueItem) { var afterQuery = function(queueItem) {
dequeue.call(this, queueItem) dequeue.call(this, queueItem)
transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length) transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
disconnectIfNoConnections.call(this) disconnectIfNoConnections.call(this)
} }
var execQueueItem = function(queueItem) { var execQueueItem = function(queueItem) {
var self = this var self = this
self.getConnection({ self.getConnection({
priority: queueItem.query.options.priority, priority: queueItem.query.options.priority,
type: queueItem.query.options.type type: queueItem.query.options.type
}, function (err, connection) { }, function (err, connection) {
if (err) { if (err) {
queueItem.query.emit('error', err) queueItem.query.emit('error', err)
return return
} }
queueItem.query.client = connection queueItem.query.client = connection
queueItem.client = connection queueItem.client = connection
queueItem.query queueItem.query
.success(function(){ afterQuery.call(self, queueItem) }) .success(function () {
.error(function(){ afterQuery.call(self, queueItem) }) afterQuery.call(self, queueItem)
})
queueItem.query.run(queueItem.sql, queueItem.client) .error(function () {
}) afterQuery.call(self, queueItem)
} })
ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() { queueItem.query.run(queueItem.sql, queueItem.client)
return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0)) })
}) }
// legacy ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function () {
ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() { return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queue && (this.client._queue.length > 0))
return !this.hasQueuedItems })
})
// legacy
ConnectorManager.prototype.__defineGetter__('isConnected', function() { ConnectorManager.prototype.__defineGetter__('hasNoConnections', function () {
return this.client != null return !this.hasQueuedItems
}) })
var disconnectIfNoConnections = function() { ConnectorManager.prototype.__defineGetter__('isConnected', function () {
var self = this return this.client != null
})
this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId)
this.disconnectTimeoutId = setTimeout(function() { var disconnectIfNoConnections = function () {
self.isConnected && !self.hasQueuedItems && self.disconnect() var self = this
}, 100)
} this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId)
this.disconnectTimeoutId = setTimeout(function () {
return ConnectorManager self.isConnected && !self.hasQueuedItems && self.disconnect()
}, 100)
}
return ConnectorManager
})() })()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!