不要怂,就是干,撸起袖子干!

Commit fa2f3733 by Sushant Committed by GitHub

refactor: pooling (#9968)

1 parent 71e3b59f
...@@ -12,9 +12,8 @@ const defaultPoolingConfig = { ...@@ -12,9 +12,8 @@ const defaultPoolingConfig = {
max: 5, max: 5,
min: 0, min: 0,
idle: 10000, idle: 10000,
acquire: 10000, acquire: 60000,
evict: 10000, evict: 1000
handleDisconnects: true
}; };
/** /**
...@@ -112,7 +111,6 @@ class ConnectionManager { ...@@ -112,7 +111,6 @@ class ConnectionManager {
}, { }, {
Promise: config.pool.Promise, Promise: config.pool.Promise,
testOnBorrow: true, testOnBorrow: true,
returnToHead: true,
autostart: false, autostart: false,
max: config.pool.max, max: config.pool.max,
min: config.pool.min, min: config.pool.min,
...@@ -192,7 +190,6 @@ class ConnectionManager { ...@@ -192,7 +190,6 @@ class ConnectionManager {
}, { }, {
Promise: config.pool.Promise, Promise: config.pool.Promise,
testOnBorrow: true, testOnBorrow: true,
returnToHead: true,
autostart: false, autostart: false,
max: config.pool.max, max: config.pool.max,
min: config.pool.min, min: config.pool.min,
...@@ -214,7 +211,6 @@ class ConnectionManager { ...@@ -214,7 +211,6 @@ class ConnectionManager {
}, { }, {
Promise: config.pool.Promise, Promise: config.pool.Promise,
testOnBorrow: true, testOnBorrow: true,
returnToHead: true,
autostart: false, autostart: false,
max: config.pool.max, max: config.pool.max,
min: config.pool.min, min: config.pool.min,
......
...@@ -73,63 +73,72 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -73,63 +73,72 @@ class ConnectionManager extends AbstractConnectionManager {
connection.lib = this.lib; connection.lib = this.lib;
const resourceLock = new ResourceLock(connection); const resourceLock = new ResourceLock(connection);
connection.on('end', () => { const connectHandler = error => {
reject(new sequelizeErrors.ConnectionError('Connection was closed by remote server')); connection.removeListener('end', endHandler);
}); connection.removeListener('error', errorHandler);
if (error) return reject(error);
connection.on('connect', err => {
if (!err) {
debug('connection acquired'); debug('connection acquired');
return resolve(resourceLock); resolve(resourceLock);
} };
if (!err.code) { const endHandler = () => {
reject(new sequelizeErrors.ConnectionError(err)); connection.removeListener('connect', connectHandler);
return; connection.removeListener('error', errorHandler);
} reject(new Error('Connection was closed by remote server'));
};
switch (err.code) { const errorHandler = error => {
case 'ESOCKET': connection.removeListener('connect', connectHandler);
if (err.message.includes('connect EHOSTUNREACH')) { connection.removeListener('end', endHandler);
reject(new sequelizeErrors.HostNotReachableError(err)); reject(error);
} else if (err.message.includes('connect ENETUNREACH')) { };
reject(new sequelizeErrors.HostNotReachableError(err));
} else if (err.message.includes('connect EADDRNOTAVAIL')) { connection.once('error', errorHandler);
reject(new sequelizeErrors.HostNotReachableError(err)); connection.once('end', endHandler);
} else if (err.message.includes('getaddrinfo ENOTFOUND')) { connection.once('connect', connectHandler);
reject(new sequelizeErrors.HostNotFoundError(err));
} else if (err.message.includes('connect ECONNREFUSED')) {
reject(new sequelizeErrors.ConnectionRefusedError(err));
} else {
reject(new sequelizeErrors.ConnectionError(err));
}
break;
case 'ER_ACCESS_DENIED_ERROR':
case 'ELOGIN':
reject(new sequelizeErrors.AccessDeniedError(err));
break;
case 'EINVAL':
reject(new sequelizeErrors.InvalidConnectionError(err));
break;
default:
reject(new sequelizeErrors.ConnectionError(err));
break;
}
});
if (config.dialectOptions && config.dialectOptions.debug) { if (config.dialectOptions && config.dialectOptions.debug) {
connection.on('debug', debugTedious); connection.on('debug', debugTedious);
} }
}).tap(resourceLock => {
if (config.pool.handleDisconnects) { const connection = resourceLock.unwrap();
connection.on('error', err => { connection.on('error', error => {
switch (err.code) { switch (error.code) {
case 'ESOCKET': case 'ESOCKET':
case 'ECONNRESET': case 'ECONNRESET':
this.pool.destroy(resourceLock) this.pool.destroy(resourceLock)
.catch(/Resource not currently part of this pool/, () => {}); .catch(/Resource not currently part of this pool/, () => {});
} }
}); });
}).catch(error => {
if (!error.code) {
throw new sequelizeErrors.ConnectionError(error);
}
switch (error.code) {
case 'ESOCKET':
if (error.message.includes('connect EHOSTUNREACH')) {
throw new sequelizeErrors.HostNotReachableError(error);
} else if (error.message.includes('connect ENETUNREACH')) {
throw new sequelizeErrors.HostNotReachableError(error);
} else if (error.message.includes('connect EADDRNOTAVAIL')) {
throw new sequelizeErrors.HostNotReachableError(error);
} else if (error.message.includes('getaddrinfo ENOTFOUND')) {
throw new sequelizeErrors.HostNotFoundError(error);
} else if (error.message.includes('connect ECONNREFUSED')) {
throw new sequelizeErrors.ConnectionRefusedError(error);
} else {
throw new sequelizeErrors.ConnectionError(error);
}
case 'ER_ACCESS_DENIED_ERROR':
case 'ELOGIN':
throw new sequelizeErrors.AccessDeniedError(error);
case 'EINVAL':
throw new sequelizeErrors.InvalidConnectionError(error);
default:
throw new sequelizeErrors.ConnectionError(error);
} }
}); });
} }
......
...@@ -61,7 +61,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -61,7 +61,7 @@ class ConnectionManager extends AbstractConnectionManager {
/** /**
* Connect with MySQL database based on config, Handle any errors in connection * Connect with MySQL database based on config, Handle any errors in connection
* Set the pool handlers on connection.error * Set the pool handlers on connection.error
* Also set proper timezone once conection is connected * Also set proper timezone once connection is connected
* *
* @returns Promise<Connection> * @returns Promise<Connection>
* @private * @private
...@@ -90,36 +90,38 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -90,36 +90,38 @@ class ConnectionManager extends AbstractConnectionManager {
const connection = this.lib.createConnection(connectionConfig); const connection = this.lib.createConnection(connectionConfig);
const errorHandler = e => { const errorHandler = e => {
// clean up connect event if there is error // clean up connect & error event if there is error
connection.removeListener('connect', connectHandler); connection.removeListener('connect', connectHandler);
connection.removeListener('error', connectHandler);
if (config.pool.handleDisconnects) {
debug(`connection error ${e.code}`);
if (e.code === 'PROTOCOL_CONNECTION_LOST') {
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
return;
}
}
connection.removeListener('error', errorHandler);
reject(e); reject(e);
}; };
const connectHandler = () => { const connectHandler = () => {
if (!config.pool.handleDisconnects) {
// clean up error event if connected // clean up error event if connected
connection.removeListener('error', errorHandler); connection.removeListener('error', errorHandler);
}
resolve(connection); resolve(connection);
}; };
// dont use connection.once for error event handling here
// mysql2 emit error two times in case handshake was failed
// first error is protocol_lost and second is timeout
// if we will use `once.error` node process will crash on 2nd error emit
connection.on('error', errorHandler); connection.on('error', errorHandler);
connection.once('connect', connectHandler); connection.once('connect', connectHandler);
}) })
.tap (() => { debug('connection acquired'); }) .tap (() => { debug('connection acquired'); })
.then(connection => { .then(connection => {
connection.on('error', error => {
switch (error.code) {
case 'ESOCKET':
case 'ECONNRESET':
case 'EPIPE':
case 'PROTOCOL_CONNECTION_LOST':
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
}
});
return new Utils.Promise((resolve, reject) => { return new Utils.Promise((resolve, reject) => {
if (!this.sequelize.config.keepDefaultTimezone) { if (!this.sequelize.config.keepDefaultTimezone) {
// set timezone for this connection // set timezone for this connection
...@@ -130,6 +132,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -130,6 +132,7 @@ class ConnectionManager extends AbstractConnectionManager {
if (err) { reject(err); } else { resolve(connection); } if (err) { reject(err); } else { resolve(connection); }
}); });
} }
// return connection without executing SET time_zone query // return connection without executing SET time_zone query
resolve(connection); resolve(connection);
}); });
...@@ -172,8 +175,11 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -172,8 +175,11 @@ class ConnectionManager extends AbstractConnectionManager {
} }
validate(connection) { validate(connection) {
return connection && connection._fatalError === null && connection._protocolError === null && !connection._closing && return connection
!connection.stream.destroyed; && !connection._fatalError
&& !connection._protocolError
&& !connection._closing
&& !connection.stream.destroyed;
} }
} }
......
...@@ -105,10 +105,22 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -105,10 +105,22 @@ class ConnectionManager extends AbstractConnectionManager {
} }
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const connection = new this.lib.Client(connectionConfig);
let responded = false; let responded = false;
const connection = new this.lib.Client(connectionConfig);
const endHandler = () => {
debug('connection timeout');
if (!responded) {
reject(new sequelizeErrors.ConnectionTimedOutError(new Error('Connection timed out')));
}
};
// If we didn't ever hear from the client.connect() callback the connection timeout
// node-postgres does not treat this as an error since no active query was ever emitted
connection.once('end', endHandler);
connection.connect(err => { connection.connect(err => {
responded = true;
if (err) { if (err) {
if (err.code) { if (err.code) {
switch (err.code) { switch (err.code) {
...@@ -131,31 +143,23 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -131,31 +143,23 @@ class ConnectionManager extends AbstractConnectionManager {
} else { } else {
reject(new sequelizeErrors.ConnectionError(err)); reject(new sequelizeErrors.ConnectionError(err));
} }
return; } else {
}
responded = true;
debug('connection acquired'); debug('connection acquired');
connection.removeListener('end', endHandler);
resolve(connection); resolve(connection);
});
// If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
connection.on('end', () => {
debug('connection timeout');
if (!responded) {
reject(new sequelizeErrors.ConnectionTimedOutError(new Error('Connection timed out')));
} }
}); });
// Don't let a Postgres restart (or error) to take down the whole app
connection.on('error', err => {
debug(`connection error ${err.code}`);
connection._invalid = true;
});
}).tap(connection => { }).tap(connection => {
// Disable escape characters in strings, see https://github.com/sequelize/sequelize/issues/3545
let query = ''; let query = '';
if (this.sequelize.options.databaseVersion !== 0 && semver.gte(this.sequelize.options.databaseVersion, '8.2.0')) { if (
this.sequelize.options.databaseVersion !== 0
&& semver.gte(this.sequelize.options.databaseVersion, '8.2.0')
&& semver.lt(this.sequelize.options.databaseVersion, '9.1.0')
) {
// Disable escape characters in strings
// see https://github.com/sequelize/sequelize/issues/3545 (security issue)
// see https://www.postgresql.org/docs/current/static/runtime-config-compatible.html#GUC-STANDARD-CONFORMING-STRINGS
query += 'SET standard_conforming_strings=on;'; query += 'SET standard_conforming_strings=on;';
} }
...@@ -180,6 +184,15 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -180,6 +184,15 @@ class ConnectionManager extends AbstractConnectionManager {
) { ) {
return this._refreshDynamicOIDs(connection); return this._refreshDynamicOIDs(connection);
} }
}).tap(connection => {
// Don't let a Postgres restart (or error) to take down the whole app
connection.on('error', error => {
connection._invalid = true;
debug(`connection error ${error.code}`);
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
});
}); });
} }
...@@ -188,7 +201,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -188,7 +201,7 @@ class ConnectionManager extends AbstractConnectionManager {
} }
validate(connection) { validate(connection) {
return connection._invalid === undefined; return !connection._invalid && !connection._ending;
} }
_refreshDynamicOIDs(connection) { _refreshDynamicOIDs(connection) {
......
...@@ -78,8 +78,8 @@ class Sequelize { ...@@ -78,8 +78,8 @@ class Sequelize {
* @param {number} [options.pool.max=5] Maximum number of connection in pool * @param {number} [options.pool.max=5] Maximum number of connection in pool
* @param {number} [options.pool.min=0] Minimum number of connection in pool * @param {number} [options.pool.min=0] Minimum number of connection in pool
* @param {number} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released. Use with combination of evict for proper working, for more details read https://github.com/coopernurse/node-pool/issues/178#issuecomment-327110870 * @param {number} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released. Use with combination of evict for proper working, for more details read https://github.com/coopernurse/node-pool/issues/178#issuecomment-327110870
* @param {number} [options.pool.acquire=10000] The maximum time, in milliseconds, that pool will try to get connection before throwing error * @param {number} [options.pool.acquire=60000] The maximum time, in milliseconds, that pool will try to get connection before throwing error
* @param {number} [options.pool.evict=10000] The time interval, in milliseconds, for evicting stale connections. Set it to 0 to disable this feature. * @param {number} [options.pool.evict=1000] The time interval, in milliseconds, for evicting stale connections. Set it to 0 to disable this feature.
* @param {boolean} [options.pool.handleDisconnects=true] Controls if pool should handle connection disconnect automatically without throwing errors * @param {boolean} [options.pool.handleDisconnects=true] Controls if pool should handle connection disconnect automatically without throwing errors
* @param {Function} [options.pool.validate] A function that validates a connection. Called with client. The default function checks that client is an object, and that its state is not disconnected * @param {Function} [options.pool.validate] A function that validates a connection. Called with client. The default function checks that client is an object, and that its state is not disconnected
* @param {boolean} [options.quoteIdentifiers=true] Set to `false` to make table names and attributes case-insensitive on Postgres and skip double quoting of them. WARNING: Setting this to false may expose vulnerabilities and is not recommended! * @param {boolean} [options.quoteIdentifiers=true] Set to `false` to make table names and attributes case-insensitive on Postgres and skip double quoting of them. WARNING: Setting this to false may expose vulnerabilities and is not recommended!
......
...@@ -7,80 +7,7 @@ const Sequelize = Support.Sequelize; ...@@ -7,80 +7,7 @@ const Sequelize = Support.Sequelize;
const dialect = Support.getTestDialect(); const dialect = Support.getTestDialect();
if (dialect.match(/^mssql/)) { if (dialect.match(/^mssql/)) {
describe('[MSSQL Specific] Query Queue', () => { describe('[MSSQL Specific] Connection Manager', () => {
it('should work with handleDisconnects', () => {
const sequelize = Support.createSequelizeInstance({ pool: { min: 1, max: 1, idle: 5000 } });
const cm = sequelize.connectionManager;
let conn;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
// simulate a unexpected end
// connection removed from pool by MSSQL Conn Manager
conn.unwrap().emit('error', {code: 'ECONNRESET'});
})
.then(() => cm.getConnection())
.then(connection => {
expect(conn).to.not.be.equal(connection);
expect(cm.validate(conn)).to.not.be.ok;
return cm.releaseConnection(connection);
});
});
it('should handle double disconnect', () => {
const sequelize = Support.createSequelizeInstance({ pool: { min: 1, max: 1, idle: 5000 } });
const cm = sequelize.connectionManager;
let count = 0;
let conn = null;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
conn = connection;
const unwrapConn = conn.unwrap();
unwrapConn.on('end', () => {
count++;
});
return cm.disconnect(conn);
})
.then(() => cm.disconnect(conn))
.then(() => {
expect(count).to.be.eql(1);
});
});
it('should not throw when non pooled connection is unexpectedly closed', () => {
const sequelize = Support.createSequelizeInstance({ pool: { min: 1, max: 1, idle: 5000 } });
const cm = sequelize.connectionManager;
let conn;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
conn = connection;
// remove from pool
return cm.pool.destroy(connection);
})
.then(() => {
// unexpected disconnect
const unwrapConn = conn.unwrap();
unwrapConn.emit('error', {
code: 'ESOCKET'
});
});
});
describe('Errors', () => { describe('Errors', () => {
it('ECONNREFUSED', () => { it('ECONNREFUSED', () => {
const sequelize = Support.createSequelizeInstance({ host: '127.0.0.1', port: 34237 }); const sequelize = Support.createSequelizeInstance({ host: '127.0.0.1', port: 34237 });
......
...@@ -2,113 +2,12 @@ ...@@ -2,113 +2,12 @@
const chai = require('chai'); const chai = require('chai');
const expect = chai.expect; const expect = chai.expect;
const Sequelize = require('../../../../index');
const Support = require('../../support'); const Support = require('../../support');
const dialect = Support.getTestDialect(); const dialect = Support.getTestDialect();
const sinon = require('sinon');
const DataTypes = require('../../../../lib/data-types'); const DataTypes = require('../../../../lib/data-types');
if (dialect === 'mysql') { if (dialect === 'mysql') {
describe('[MYSQL Specific] Connection Manager', () => { describe('[MYSQL Specific] Connection Manager', () => {
it('works correctly after being idle', function() {
const User = this.sequelize.define('User', { username: DataTypes.STRING });
const spy = sinon.spy();
return User
.sync({force: true})
.then(() => User.create({username: 'user1'}))
.then(() => User.count())
.then(count => {
expect(count).to.equal(1);
spy();
return Sequelize.Promise.delay(1000);
})
.then(() => User.count())
.then(count => {
expect(count).to.equal(1);
spy();
if (!spy.calledTwice) {
throw new Error('Spy was not called twice');
}
});
});
it('accepts new queries after shutting down a connection', () => {
// Create a sequelize instance with fast disconnecting connection
const sequelize = Support.createSequelizeInstance({ pool: { idle: 50, max: 1, evict: 10 }});
const User = sequelize.define('User', { username: DataTypes.STRING });
return User
.sync({force: true})
.then(() => User.create({ username: 'user1' }))
.then(() => Sequelize.Promise.delay(100))
.then(() => {
expect(sequelize.connectionManager.pool.size).to.equal(0);
//This query will be queued just after the `client.end` is executed and before its callback is called
return sequelize.query('SELECT COUNT(*) AS count FROM Users', { type: sequelize.QueryTypes.SELECT });
})
.then(count => {
expect(sequelize.connectionManager.pool.size).to.equal(1);
expect(count[0].count).to.equal(1);
});
});
it('should maintain connection', () => {
const sequelize = Support.createSequelizeInstance({ pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000 }});
const cm = sequelize.connectionManager;
let conn;
return sequelize.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
return cm.releaseConnection(conn);
})
.then(() => {
// Get next available connection
return cm.getConnection();
})
.then(connection => {
// Old threadId should be same as current connection
expect(conn.threadId).to.be.equal(connection.threadId);
expect(cm.validate(conn)).to.be.ok;
return cm.releaseConnection(connection);
});
});
it('should work with handleDisconnects before release', () => {
const sequelize = Support.createSequelizeInstance({pool: { max: 1, min: 1, handleDisconnects: true, idle: 5000 }});
const cm = sequelize.connectionManager;
let conn;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
// simulate a unexpected end from MySQL2
conn.stream.emit('end');
return cm.releaseConnection(connection);
})
.then(() => {
// Get next available connection
return cm.getConnection();
})
.then(connection => {
// Old threadId should be different from current new one
expect(conn.threadId).to.not.be.equal(connection.threadId);
expect(sequelize.connectionManager.pool.size).to.equal(1);
expect(cm.validate(conn)).to.be.not.ok;
return cm.releaseConnection(connection);
});
});
it('-FOUND_ROWS can be suppressed to get back legacy behavior', () => { it('-FOUND_ROWS can be suppressed to get back legacy behavior', () => {
const sequelize = Support.createSequelizeInstance({ dialectOptions: { flags: '' }}); const sequelize = Support.createSequelizeInstance({ dialectOptions: { flags: '' }});
const User = sequelize.define('User', { username: DataTypes.STRING }); const User = sequelize.define('User', { username: DataTypes.STRING });
...@@ -124,34 +23,6 @@ if (dialect === 'mysql') { ...@@ -124,34 +23,6 @@ if (dialect === 'mysql') {
.spread(affectedCount => affectedCount.should.equal(1)); .spread(affectedCount => affectedCount.should.equal(1));
}); });
it('should work with handleDisconnects', () => {
const sequelize = Support.createSequelizeInstance({pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000}});
const cm = sequelize.connectionManager;
let conn;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
return cm.releaseConnection(conn);
})
.then(() => {
// simulate a unexpected end from MySQL2 AFTER releasing the connection
conn.stream.emit('end');
// Get next available connection
return cm.getConnection();
})
.then(connection => {
// Old threadId should be different from current new one
expect(conn.threadId).to.not.be.equal(connection.threadId);
expect(cm.validate(conn)).to.not.be.ok;
return cm.releaseConnection(connection);
});
});
it('should acquire a valid connection when keepDefaultTimezone is true', () => { it('should acquire a valid connection when keepDefaultTimezone is true', () => {
const sequelize = Support.createSequelizeInstance({keepDefaultTimezone: true, pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000}}); const sequelize = Support.createSequelizeInstance({keepDefaultTimezone: true, pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000}});
const cm = sequelize.connectionManager; const cm = sequelize.connectionManager;
......
...@@ -7,8 +7,56 @@ const dialect = Support.getTestDialect(); ...@@ -7,8 +7,56 @@ const dialect = Support.getTestDialect();
const sinon = require('sinon'); const sinon = require('sinon');
const Sequelize = Support.Sequelize; const Sequelize = Support.Sequelize;
function assertSameConnection(newConnection, oldConnection) {
switch (dialect) {
case 'postgres':
expect(oldConnection.processID).to.be.equal(newConnection.processID).and.to.be.ok;
break;
case 'mysql':
expect(oldConnection.threadId).to.be.equal(newConnection.threadId).and.to.be.ok;
break;
case 'mssql':
expect(newConnection.unwrap().dummyId).to.equal(oldConnection.unwrap().dummyId).and.to.be.ok;
break;
default:
throw new Error('Unsupported dialect');
}
};
function assertNewConnection(newConnection, oldConnection) {
switch (dialect) {
case 'postgres':
expect(oldConnection.processID).to.not.be.equal(newConnection.processID);
break;
case 'mysql':
expect(oldConnection.threadId).to.not.be.equal(newConnection.threadId);
break;
case 'mssql':
expect(newConnection.unwrap().dummyId).to.not.be.ok;
expect(oldConnection.unwrap().dummyId).to.be.ok;
break;
default:
throw new Error('Unsupported dialect');
}
};
function unwrapAndAttachMSSQLUniqueId(connection) {
if (dialect === 'mssql') {
connection = connection.unwrap();
connection.dummyId = Math.random();
}
return connection;
}
describe(Support.getTestDialectTeaser('Pooling'), function() { describe(Support.getTestDialectTeaser('Pooling'), function() {
if (dialect === 'sqlite') return; if (dialect === 'sqlite' || process.env.DIALECT === 'postgres-native') return;
beforeEach(() => { beforeEach(() => {
this.sinon = sinon.createSandbox(); this.sinon = sinon.createSandbox();
...@@ -18,12 +66,185 @@ describe(Support.getTestDialectTeaser('Pooling'), function() { ...@@ -18,12 +66,185 @@ describe(Support.getTestDialectTeaser('Pooling'), function() {
this.sinon.restore(); this.sinon.restore();
}); });
it('should reject with ConnectionAcquireTimeoutError when unable to acquire connection in given time', () => { describe('network / connection errors', () => {
it('should obtain new connection when old connection is abruptly closed', () => {
const sequelize = Support.createSequelizeInstance({
pool: {
max: 1,
min: 1,
idle: 5000
}
});
const cm = sequelize.connectionManager;
let conn;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
if (dialect === 'mssql') {
connection = unwrapAndAttachMSSQLUniqueId(connection);
}
// simulate an unexpected error
// should never be returned again
connection.emit('error', {
code: 'ECONNRESET'
});
})
.then(() => {
// Get next available connection
return cm.getConnection();
})
.then(connection => {
assertNewConnection(connection, conn);
expect(sequelize.connectionManager.pool.size).to.equal(1);
expect(cm.validate(conn)).to.be.not.ok;
return cm.releaseConnection(connection);
});
});
it('should obtain new connection when released connection dies inside pool', () => {
const sequelize = Support.createSequelizeInstance({
pool: {
max: 1,
min: 1,
idle: 5000
}
});
const cm = sequelize.connectionManager;
let oldConnection;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
oldConnection = connection;
return cm.releaseConnection(connection);
})
.then(() => {
let connection = oldConnection;
if (dialect === 'mssql') {
connection = unwrapAndAttachMSSQLUniqueId(connection);
}
// simulate an unexpected error
// should never be returned again
if (dialect.match(/postgres/)) {
connection.end();
} else {
connection.close();
}
})
.then(() => {
// Get next available connection
return cm.getConnection();
})
.then(connection => {
assertNewConnection(connection, oldConnection);
expect(sequelize.connectionManager.pool.size).to.equal(1);
expect(cm.validate(oldConnection)).to.be.not.ok;
return cm.releaseConnection(connection);
});
});
});
describe('idle', () => {
it('should maintain connection within idle range', () => {
const sequelize = Support.createSequelizeInstance({
pool: {
min: 1,
max: 1,
idle: 10
}
});
const cm = sequelize.connectionManager;
let conn;
return sequelize.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
if (dialect === 'mssql') {
connection = unwrapAndAttachMSSQLUniqueId(connection);
}
// returning connection back to pool
return cm.releaseConnection(conn);
})
.then(() => {
// Get next available connection
return Sequelize.Promise.delay(9).then(() => cm.getConnection());
})
.then(connection => {
assertSameConnection(connection, conn);
expect(cm.validate(conn)).to.be.ok;
return cm.releaseConnection(connection);
});
});
it('should get new connection beyond idle range', () => {
const sequelize = Support.createSequelizeInstance({
pool: {
min: 1,
max: 1,
idle: 1,
evict: 10
}
});
const cm = sequelize.connectionManager;
let conn;
return sequelize.sync()
.then(() => cm.getConnection())
.then(connection => {
// Save current connection
conn = connection;
if (dialect === 'mssql') {
connection = unwrapAndAttachMSSQLUniqueId(connection);
}
// returning connection back to pool
return cm.releaseConnection(conn);
})
.then(() => {
// Get next available connection
return Sequelize.Promise.delay(30).then(() => cm.getConnection());
})
.then(connection => {
assertNewConnection(connection, conn);
expect(cm.validate(conn)).not.to.be.ok;
return cm.releaseConnection(connection);
});
});
});
describe('acquire', () => {
it('should reject with ConnectionAcquireTimeoutError when unable to acquire connection', () => {
this.testInstance = new Sequelize('localhost', 'ffd', 'dfdf', { this.testInstance = new Sequelize('localhost', 'ffd', 'dfdf', {
dialect, dialect,
databaseVersion: '1.2.3', databaseVersion: '1.2.3',
pool: { pool: {
acquire: 1000 //milliseconds acquire: 10
} }
}); });
...@@ -34,12 +255,12 @@ describe(Support.getTestDialectTeaser('Pooling'), function() { ...@@ -34,12 +255,12 @@ describe(Support.getTestDialectTeaser('Pooling'), function() {
.to.eventually.be.rejectedWith(Sequelize.ConnectionAcquireTimeoutError); .to.eventually.be.rejectedWith(Sequelize.ConnectionAcquireTimeoutError);
}); });
it('should not result in unhandled promise rejection when unable to acquire connection', () => { it('should reject with ConnectionAcquireTimeoutError when unable to acquire connection for transaction', () => {
this.testInstance = new Sequelize('localhost', 'ffd', 'dfdf', { this.testInstance = new Sequelize('localhost', 'ffd', 'dfdf', {
dialect, dialect,
databaseVersion: '1.2.3', databaseVersion: '1.2.3',
pool: { pool: {
acquire: 1000, acquire: 10,
max: 1 max: 1
} }
}); });
...@@ -51,4 +272,5 @@ describe(Support.getTestDialectTeaser('Pooling'), function() { ...@@ -51,4 +272,5 @@ describe(Support.getTestDialectTeaser('Pooling'), function() {
return this.testInstance.transaction(() => {}); return this.testInstance.transaction(() => {});
})).to.eventually.be.rejectedWith(Sequelize.ConnectionAcquireTimeoutError); })).to.eventually.be.rejectedWith(Sequelize.ConnectionAcquireTimeoutError);
}); });
});
}); });
...@@ -33,20 +33,6 @@ describe(Support.getTestDialectTeaser('Sequelize'), () => { ...@@ -33,20 +33,6 @@ describe(Support.getTestDialectTeaser('Sequelize'), () => {
logger.deprecate.restore && logger.deprecate.restore(); logger.deprecate.restore && logger.deprecate.restore();
}); });
if (dialect !== 'sqlite') {
it.skip('should work with min connections', () => {
const ConnectionManager = current.dialect.connectionManager,
connectionSpy = ConnectionManager.connect = chai.spy(ConnectionManager.connect);
Support.createSequelizeInstance({
pool: {
min: 2
}
});
expect(connectionSpy).to.have.been.called.twice;
});
}
it('should pass the global options correctly', () => { it('should pass the global options correctly', () => {
const sequelize = Support.createSequelizeInstance({ logging: false, define: { underscored: true } }), const sequelize = Support.createSequelizeInstance({ logging: false, define: { underscored: true } }),
DAO = sequelize.define('dao', {name: DataTypes.STRING}); DAO = sequelize.define('dao', {name: DataTypes.STRING});
......
...@@ -21,11 +21,11 @@ chai.should(); ...@@ -21,11 +21,11 @@ chai.should();
// Make sure errors get thrown when testing // Make sure errors get thrown when testing
process.on('uncaughtException', e => { process.on('uncaughtException', e => {
console.error('An unhandled exception occured:'); console.error('An unhandled exception occurred:');
throw e; throw e;
}); });
Sequelize.Promise.onPossiblyUnhandledRejection(e => { Sequelize.Promise.onPossiblyUnhandledRejection(e => {
console.error('An unhandled rejection occured:'); console.error('An unhandled rejection occurred:');
throw e; throw e;
}); });
Sequelize.Promise.longStackTraces(); Sequelize.Promise.longStackTraces();
......
...@@ -38,13 +38,17 @@ if (dialect === 'mssql') { ...@@ -38,13 +38,17 @@ if (dialect === 'mssql') {
}); });
it('connectionManager._connect() does not delete `domain` from config.dialectOptions', function() { it('connectionManager._connect() does not delete `domain` from config.dialectOptions', function() {
this.connectionStub.returns({on(event, cb) { this.connectionStub.returns({
once(event, cb) {
if (event === 'connect') { if (event === 'connect') {
setTimeout(() => { setTimeout(() => {
cb(); cb();
}, 500); }, 500);
} }
}}); },
removeListener: () => {},
on: () => {}
});
expect(this.config.dialectOptions.domain).to.equal('TEST.COM'); expect(this.config.dialectOptions.domain).to.equal('TEST.COM');
return this.instance.dialect.connectionManager._connect(this.config).then(() => { return this.instance.dialect.connectionManager._connect(this.config).then(() => {
...@@ -53,18 +57,22 @@ if (dialect === 'mssql') { ...@@ -53,18 +57,22 @@ if (dialect === 'mssql') {
}); });
it('connectionManager._connect() should reject if end was called and connect was not', function() { it('connectionManager._connect() should reject if end was called and connect was not', function() {
this.connectionStub.returns({ on(event, cb) { this.connectionStub.returns({
once(event, cb) {
if (event === 'end') { if (event === 'end') {
setTimeout(() => { setTimeout(() => {
cb(); cb();
}, 500); }, 500);
} }
} }); },
removeListener: () => {},
on: () => {}
});
return this.instance.dialect.connectionManager._connect(this.config) return this.instance.dialect.connectionManager._connect(this.config)
.catch(err => { .catch(err => {
expect(err.name).to.equal('SequelizeConnectionError'); expect(err.name).to.equal('SequelizeConnectionError');
expect(err.parent).to.equal('Connection was closed by remote server'); expect(err.parent.message).to.equal('Connection was closed by remote server');
}); });
}); });
}); });
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!