不要怂,就是干,撸起袖子干!

Commit 81583874 by Mick Hansen

Merge pull request #1282 from mickhansen/overlookmotel-test_bad_db_conn_failure

Fix for MySQL connection errors
2 parents ad04b3fb ce49ac06
...@@ -38,8 +38,8 @@ module.exports = (function() { ...@@ -38,8 +38,8 @@ module.exports = (function() {
var self = this var self = this
if (this.useReplicaton) { if (this.useReplicaton) {
var reads = 0, var reads = 0
writes = 0; , writes = 0;
// Init configs with options from config if not present // Init configs with options from config if not present
for (var i in config.replication.read) { for (var i in config.replication.read) {
...@@ -122,7 +122,10 @@ module.exports = (function() { ...@@ -122,7 +122,10 @@ module.exports = (function() {
this.pool = Pooling.Pool({ this.pool = Pooling.Pool({
name: 'sequelize-mysql', name: 'sequelize-mysql',
create: function (done) { create: function (done) {
connect.call(self, done) connect.call(self, function (err, connection) {
// This has to be nested for some reason, else the error won't propagate correctly
done(err, connection);
})
}, },
destroy: function(client) { destroy: function(client) {
disconnect.call(self, client) disconnect.call(self, client)
...@@ -148,25 +151,20 @@ module.exports = (function() { ...@@ -148,25 +151,20 @@ module.exports = (function() {
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype); Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
var isConnecting = false;
ConnectorManager.prototype.query = function(sql, callee, options) { ConnectorManager.prototype.query = function(sql, callee, options) {
if (!this.isConnected && !this.pool) {
this.connect()
}
if (this.useQueue) { if (this.useQueue) {
// If queueing we'll let the execQueueItem method handle connecting
var queueItem = { var queueItem = {
query: new Query(this.client, this.sequelize, callee, options || {}), query: new Query(null, this.sequelize, callee, options || {}),
sql: sql sql: sql
}; };
queueItem.query.options.uuid = this.config.uuid queueItem.query.options.uuid = this.config.uuid
enqueue.call(this, queueItem, options) enqueue.call(this, queueItem, options);
return queueItem.query return queueItem.query;
} }
var self = this, query = new Query(this.client, this.sequelize, callee, options || {}); var self = this, query = new Query(null, this.sequelize, callee, options || {});
this.pendingQueries++; this.pendingQueries++;
query.options.uuid = this.config.uuid query.options.uuid = this.config.uuid
...@@ -177,40 +175,66 @@ module.exports = (function() { ...@@ -177,40 +175,66 @@ module.exports = (function() {
} else { } else {
if (self.pendingQueries === 0) { if (self.pendingQueries === 0) {
setTimeout(function() { setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self) if (self.pendingQueries === 0){
self.disconnect.call(self);
}
}, 100); }, 100);
} }
} }
}); });
if (!this.pool) { this.getConnection(options, function (err, client) {
query.run(sql);
} else {
this.pool.acquire(function(err, client) {
if (err) {
return query.emit('error', err)
}
query.client = client query.client = client
query.run(sql) query.run(sql)
return; });
}, undefined, options.type)
}
return query; return query;
}; };
ConnectorManager.prototype.connect = function() { ConnectorManager.prototype.getConnection = function(options, callback) {
var self = this; var self = this;
// in case database is slow to connect, prevent orphaning the client
if (this.isConnecting || this.pool) { if (typeof options === "function") {
callback = options;
options = {};
}
return new Utils.CustomEventEmitter(function (emitter) {
if (!self.pool) {
// Regular client caching
if (self.client) {
return emitter.emit('success', self.client);
} else {
// Cache for concurrent queries
if (self._getConnection) {
self._getConnection.proxy(emitter);
return; return;
} }
// Set cache and acquire connection
self._getConnection = emitter;
connect.call(self, function(err, client) { connect.call(self, function(err, client) {
if (err) {
return emitter.emit('error', err);
}
// Unset caching, should now be caught by the self.client check
self._getConnection = null;
self.client = client; self.client = client;
return; emitter.emit('success', client);
}); });
return; }
}
if (self.pool) {
// Acquire from pool
self.pool.acquire(function(err, client) {
if (err) {
return emitter.emit('error', err);
}
emitter.emit('success', client);
}, options.priority, options.type);
}
}).run().done(callback);
}; };
ConnectorManager.prototype.disconnect = function() { ConnectorManager.prototype.disconnect = function() {
...@@ -282,17 +306,30 @@ module.exports = (function() { ...@@ -282,17 +306,30 @@ module.exports = (function() {
case 'EINVAL': case 'EINVAL':
emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.') emitter.emit('error', 'Failed to find MySQL server. Please double check your settings.')
break break
default:
emitter.emit('error', err);
break;
} }
return;
} }
emitter.emit('success', connection);
}) })
connection.query("SET time_zone = '+0:00'"); connection.query("SET time_zone = '+0:00'");
// client.setMaxListeners(self.maxConcurrentQueries) // client.setMaxListeners(self.maxConcurrentQueries)
this.isConnecting = false this.isConnecting = false
if (config.pool != null && config.pool.handleDisconnects) { if (config.pool !== null && config.pool.handleDisconnects) {
handleDisconnect(this.pool, connection) handleDisconnect(this.pool, connection)
} }
done(null, connection)
emitter.on('error', function (err) {
done(err);
});
emitter.on('success', function (connection) {
done(null, connection);
});
} }
var handleDisconnect = function(pool, client) { var handleDisconnect = function(pool, client) {
...@@ -312,23 +349,7 @@ module.exports = (function() { ...@@ -312,23 +349,7 @@ module.exports = (function() {
options = options || {} options = options || {}
if (this.activeQueue.length < this.maxConcurrentQueries) { if (this.activeQueue.length < this.maxConcurrentQueries) {
this.activeQueue.push(queueItem) this.activeQueue.push(queueItem)
if (this.pool) {
var self = this
this.pool.acquire(function(err, client) {
if (err) {
queueItem.query.emit('error', err)
return
}
//we set the client here, asynchronously, when getting a pooled connection
//allowing the ConnectorManager.query method to remain synchronous
queueItem.query.client = client
queueItem.client = client
execQueueItem.call(self, queueItem)
return
}, undefined, options.type)
} else {
execQueueItem.call(this, queueItem) execQueueItem.call(this, queueItem)
}
} else { } else {
this.queue.push(queueItem) this.queue.push(queueItem)
} }
...@@ -361,11 +382,23 @@ module.exports = (function() { ...@@ -361,11 +382,23 @@ module.exports = (function() {
var execQueueItem = function(queueItem) { var execQueueItem = function(queueItem) {
var self = this var self = this
self.getConnection({
priority: queueItem.query.options.priority,
type: queueItem.query.options.type
}, function (err, connection) {
if (err) {
queueItem.query.emit('error', err)
return
}
queueItem.query.client = connection
queueItem.client = connection
queueItem.query queueItem.query
.success(function(){ afterQuery.call(self, queueItem) }) .success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) }) .error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql, queueItem.client) queueItem.query.run(queueItem.sql, queueItem.client)
})
} }
ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() { ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
......
...@@ -430,6 +430,37 @@ describe(Support.getTestDialectTeaser("Sequelize"), function () { ...@@ -430,6 +430,37 @@ describe(Support.getTestDialectTeaser("Sequelize"), function () {
done() done()
}) })
}) })
it('fails with incorrect database credentials (3)', function (done) {
var sequelize = new Sequelize('db', 'user', 'pass', {
dialect: this.sequelize.options.dialect,
port: 99999
});
var Project = sequelize.define('Project', {title: Sequelize.STRING})
var Task = sequelize.define('Task', {title: Sequelize.STRING})
sequelize.sync({force: true}).done(function (err) {
expect(err).to.be.ok
done()
})
})
it('fails with incorrect database credentials (4)', function (done) {
var sequelize = new Sequelize('db', 'user', 'pass', {
dialect: this.sequelize.options.dialect,
port: 99999,
pool: {}
});
var Project = sequelize.define('Project', {title: Sequelize.STRING})
var Task = sequelize.define('Task', {title: Sequelize.STRING})
sequelize.sync({force: true}).done(function (err) {
expect(err).to.be.ok
done()
})
})
} }
describe("doesn't emit logging when explicitly saying not to", function() { describe("doesn't emit logging when explicitly saying not to", function() {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!