不要怂,就是干,撸起袖子干!

Commit bfee712a by Sushant Committed by GitHub

fix: pooling fails to handle disconnection (#7698)

* fix: pooling fails to handle disconnection

* fix: memory outage due to leak

* sinon dont like delay, use setTimeout

* review: code refactor

* review: code refactor, get rid of _acquire

* refactor: use race with timeout
1 parent 41408df5
...@@ -265,31 +265,23 @@ class ConnectionManager { ...@@ -265,31 +265,23 @@ class ConnectionManager {
promise = Promise.resolve(); promise = Promise.resolve();
} }
return promise.then(() => return promise.then(() => {
new Promise((resolve, reject) => { return Promise.race([
const connectionPromise = this.pool.acquire(options.priority, options.type, options.useMaster); this.pool.acquire(options.priority, options.type, options.useMaster),
const connectionTimer = timers.setInterval(() => { new Promise((resolve, reject) =>
let evictTimer = false; timers.setTimeout(() => {
if (this.poolError) {
if (connectionPromise.isFulfilled()) {
resolve(connectionPromise);
debug('connection acquire');
evictTimer = true;
} else if (this.poolError) {
reject(this.poolError); reject(this.poolError);
this.poolError = null;
evictTimer = true;
} else if (connectionPromise.isRejected()) {
connectionPromise.catch(reject);
evictTimer = true;
} }
}, 0))
if (evictTimer) { ])
timers.clearInterval(connectionTimer); .tap(() => { debug('connection acquired'); })
} .catch(e => {
}, 0); e = this.poolError || e;
}) this.poolError = null;
); throw e;
});
});
} }
releaseConnection(connection) { releaseConnection(connection) {
......
...@@ -92,77 +92,63 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -92,77 +92,63 @@ class ConnectionManager extends AbstractConnectionManager {
const connection = this.lib.createConnection(connectionConfig); const connection = this.lib.createConnection(connectionConfig);
const errorHandler = e => { const errorHandler = e => {
// clean up connect event if there is error // clean up connect event if there is error
connection.removeListener('connect', connectHandler); connection.removeListener('connect', connectHandler);
if (config.pool.handleDisconnects) {
debug(`connection error ${e.code}`);
if (e.code === 'PROTOCOL_CONNECTION_LOST') {
return;
}
}
connection.removeListener('error', errorHandler);
reject(e); reject(e);
}; };
const connectHandler = () => { const connectHandler = () => {
if (!config.pool.handleDisconnects) {
// clean up error event if connected // clean up error event if connected
connection.removeListener('error', errorHandler); connection.removeListener('error', errorHandler);
}
resolve(connection); resolve(connection);
}; };
connection.once('error', errorHandler); connection.on('error', errorHandler);
connection.once('connect', connectHandler); connection.once('connect', connectHandler);
}) })
.then(connection => { .tap (() => { debug('connection acquired'); })
.then(connection => {
if (config.pool.handleDisconnects) { return new Utils.Promise((resolve, reject) => {
// Connection to the MySQL server is usually // set timezone for this connection
// lost due to either server restart, or a // but named timezone are not directly supported in mysql, so get its offset first
// connection idle timeout (the wait_timeout let tzOffset = this.sequelize.options.timezone;
// server variable configures this) tzOffset = /\//.test(tzOffset) ? momentTz.tz(tzOffset).format('Z') : tzOffset;
// connection.query(`SET time_zone = '${tzOffset}'`, err => {
// See [stackoverflow answer](http://stackoverflow.com/questions/20210522/nodejs-mysql-error-connection-lost-the-server-closed-the-connection) if (err) { reject(err); } else { resolve(connection); }
connection.on('error', err => {
if (err.code === 'PROTOCOL_CONNECTION_LOST') {
// Remove it from read/write pool
this.pool.destroy(connection);
}
debug(`connection error ${err.code}`);
});
}
debug('connection acquired');
return connection;
})
.then(connection => {
return new Utils.Promise((resolve, reject) => {
// set timezone for this connection
// but named timezone are not directly supported in mysql, so get its offset first
let tzOffset = this.sequelize.options.timezone;
tzOffset = /\//.test(tzOffset) ? momentTz.tz(tzOffset).format('Z') : tzOffset;
connection.query(`SET time_zone = '${tzOffset}'`, err => {
if (err) { reject(err); } else { resolve(connection); }
});
}); });
})
.catch(err => {
if (err.code) {
switch (err.code) {
case 'ECONNREFUSED':
throw new SequelizeErrors.ConnectionRefusedError(err);
case 'ER_ACCESS_DENIED_ERROR':
throw new SequelizeErrors.AccessDeniedError(err);
case 'ENOTFOUND':
throw new SequelizeErrors.HostNotFoundError(err);
case 'EHOSTUNREACH':
throw new SequelizeErrors.HostNotReachableError(err);
case 'EINVAL':
throw new SequelizeErrors.InvalidConnectionError(err);
default:
throw new SequelizeErrors.ConnectionError(err);
}
} else {
throw new SequelizeErrors.ConnectionError(err);
}
}); });
})
.catch(err => {
switch (err.code) {
case 'ECONNREFUSED':
throw new SequelizeErrors.ConnectionRefusedError(err);
case 'ER_ACCESS_DENIED_ERROR':
throw new SequelizeErrors.AccessDeniedError(err);
case 'ENOTFOUND':
throw new SequelizeErrors.HostNotFoundError(err);
case 'EHOSTUNREACH':
throw new SequelizeErrors.HostNotReachableError(err);
case 'EINVAL':
throw new SequelizeErrors.InvalidConnectionError(err);
default:
throw new SequelizeErrors.ConnectionError(err);
}
});
} }
disconnect(connection) { disconnect(connection) {
// Dont disconnect connections with CLOSED state // Dont disconnect connections with CLOSED state
if (connection._closing) { if (connection._closing) {
debug('connection tried to disconnect but was already at CLOSED state'); debug('connection tried to disconnect but was already at CLOSED state');
......
...@@ -76,7 +76,7 @@ if (dialect === 'mysql') { ...@@ -76,7 +76,7 @@ if (dialect === 'mysql') {
}); });
}); });
it('should work with handleDisconnects', () => { it('should work with handleDisconnects before release', () => {
const sequelize = Support.createSequelizeInstance({pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000}}); const sequelize = Support.createSequelizeInstance({pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000}});
const cm = sequelize.connectionManager; const cm = sequelize.connectionManager;
let conn; let conn;
...@@ -87,8 +87,8 @@ if (dialect === 'mysql') { ...@@ -87,8 +87,8 @@ if (dialect === 'mysql') {
.then(connection => { .then(connection => {
// Save current connection // Save current connection
conn = connection; conn = connection;
// simulate a unexpected end // simulate a unexpected end from MySQL2
connection.close(); conn.stream.emit('end');
}) })
.then(() => cm.releaseConnection(conn)) .then(() => cm.releaseConnection(conn))
.then(() => { .then(() => {
...@@ -117,5 +117,33 @@ if (dialect === 'mysql') { ...@@ -117,5 +117,33 @@ if (dialect === 'mysql') {
// https://github.com/sequelize/sequelize/issues/7184 // https://github.com/sequelize/sequelize/issues/7184
.spread(affectedCount => affectedCount.should.equal(1)); .spread(affectedCount => affectedCount.should.equal(1));
}); });
it('should work with handleDisconnects', () => {
const sequelize = Support.createSequelizeInstance({pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000}});
const cm = sequelize.connectionManager;
let conn;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
return cm.releaseConnection(conn);
})
.then(() => {
// simulate a unexpected end from MySQL2 AFTER releasing the connection
conn.stream.emit('end');
// Get next available connection
return cm.getConnection();
})
.then(connection => {
// Old threadId should be different from current new one
expect(conn.threadId).to.not.be.equal(connection.threadId);
expect(cm.validate(conn)).to.not.be.ok;
return cm.releaseConnection(connection);
});
});
}); });
} }
...@@ -8,7 +8,6 @@ const chai = require('chai'), ...@@ -8,7 +8,6 @@ const chai = require('chai'),
Promise = require('bluebird'); Promise = require('bluebird');
describe(Support.getTestDialectTeaser('Include'), () => { describe(Support.getTestDialectTeaser('Include'), () => {
before(function() { before(function() {
this.clock = sinon.useFakeTimers(); this.clock = sinon.useFakeTimers();
}); });
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!