不要怂,就是干,撸起袖子干!

Commit 54269128 by Sushant Committed by GitHub

refactor(abstract/connection-manager) (#8330)

Resolves issue with acquire call which never finishes, resolve errors and connections and handle factory level error ourselves. Fixed issue with MSSQL / MySQL error out with resources currently not part of pool.

Fixed evict setting so idle connections are properly removed on time and improved documentation around connection manager and various options 
1 parent e978e59b
......@@ -6,6 +6,7 @@ services:
links:
- mysql-57
- postgres-95
- mssql
volumes:
- .:/sequelize
environment:
......@@ -39,3 +40,13 @@ services:
ports:
- "127.0.0.1:8999:3306"
container_name: mysql-57
# MSSQL
mssql:
image: microsoft/mssql-server-linux:latest
environment:
ACCEPT_EULA: "Y"
SA_PASSWORD: yourStrong(!)Password
ports:
- "127.0.0.1:8997:1433"
container_name: mssql
......@@ -6,17 +6,24 @@ const _ = require('lodash');
const Utils = require('../../utils');
const debug = Utils.getLogger().debugContext('pool');
const semver = require('semver');
const timers = require('timers');
const defaultPoolingConfig = {
max: 5,
min: 0,
idle: 10000,
acquire: 10000,
evict: 60000,
evict: 10000,
handleDisconnects: true
};
/**
* Abstract Connection Manager
*
* Connection manager which handles pool, replication and determining database version
* Works with generic-pool to maintain connection pool
*
* @private
*/
class ConnectionManager {
constructor(dialect, sequelize) {
const config = _.cloneDeep(sequelize.config);
......@@ -25,22 +32,20 @@ class ConnectionManager {
this.config = config;
this.dialect = dialect;
this.versionPromise = null;
this.poolError = null;
this.dialectName = this.sequelize.options.dialect;
if (config.pool === false) {
throw new Error('Support for pool:false was removed in v4.0');
}
config.pool =_.defaults(config.pool || {}, defaultPoolingConfig, {
config.pool = _.defaults(config.pool || {}, defaultPoolingConfig, {
validate: this._validate.bind(this),
Promise
}) ;
});
// Save a reference to the bound version so we can remove it with removeListener
this.onProcessExit = this.onProcessExit.bind(this);
process.on('exit', this.onProcessExit);
this._onProcessExit = this._onProcessExit.bind(this);
process.on('exit', this._onProcessExit);
this.initPools();
}
......@@ -57,7 +62,13 @@ class ConnectionManager {
});
}
onProcessExit() {
/**
* Handler which executes on process exit or connection manager shutdown
*
* @private
* @return {Promise}
*/
_onProcessExit() {
if (!this.pool) {
return Promise.resolve();
}
......@@ -68,59 +79,55 @@ class ConnectionManager {
});
}
/**
* Drain the pool and close it permanently
*
* @return {Promise}
*/
close() {
// Remove the listener, so all references to this instance can be garbage collected.
process.removeListener('exit', this.onProcessExit);
process.removeListener('exit', this._onProcessExit);
// Mark close of pool
this.getConnection = function getConnection() {
return Promise.reject(new Error('ConnectionManager.getConnection was called after the connection manager was closed!'));
};
return this.onProcessExit();
return this._onProcessExit();
}
/**
* Initialize connection pool. By default pool autostart is set to false, so no connection will be
* be created unless `pool.acquire` is called.
*/
initPools() {
const config = this.config;
if (!config.replication) {
this.pool = Pooling.createPool({
create: () => new Promise(resolve => {
this
._connect(config)
.tap(() => {
this.poolError = null;
})
.then(resolve)
.catch(e => {
// dont throw otherwise pool will release _dispense call
// which will call _connect even if error is fatal
// https://github.com/coopernurse/node-pool/issues/161
this.poolError = e;
});
}),
destroy: connection => {
return this._disconnect(connection).tap(() => {
debug('connection destroy');
});
create: () => this._connect(config).catch(err => err),
destroy: mayBeConnection => {
if (mayBeConnection instanceof Error) {
return Promise.resolve();
}
return this._disconnect(mayBeConnection)
.tap(() => { debug('connection destroy'); });
},
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
max: config.pool.max,
min: config.pool.min,
testOnBorrow: true,
autostart: false,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
});
this.pool.on('factoryCreateError', error => {
this.poolError = error;
});
debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, no replication`);
debug(`pool created max/min: ${config.pool.max}/${config.pool.min} with no replication`);
return;
}
......@@ -150,21 +157,26 @@ class ConnectionManager {
acquire: (priority, queryType, useMaster) => {
useMaster = _.isUndefined(useMaster) ? false : useMaster;
if (queryType === 'SELECT' && !useMaster) {
return this.pool.read.acquire(priority);
return this.pool.read.acquire(priority)
.then(mayBeConnection => this._determineConnection(mayBeConnection));
} else {
return this.pool.write.acquire(priority);
return this.pool.write.acquire(priority)
.then(mayBeConnection => this._determineConnection(mayBeConnection));
}
},
destroy: connection => {
debug('connection destroy');
return this.pool[connection.queryType].destroy(connection);
destroy: mayBeConnection => {
if (mayBeConnection instanceof Error) {
return Promise.resolve();
}
this.pool[mayBeConnection.queryType].destroy(mayBeConnection)
.tap(() => { debug('connection destroy'); });
},
clear: () => {
debug('all connection clear');
return Promise.join(
this.pool.read.clear(),
this.pool.write.clear()
);
).tap(() => { debug('all connection clear'); });
},
drain: () => {
return Promise.join(
......@@ -175,71 +187,62 @@ class ConnectionManager {
read: Pooling.createPool({
create: () => {
const nextRead = reads++ % config.replication.read.length; // round robin config
return new Promise(resolve => {
this
._connect(config.replication.read[nextRead])
.tap(connection => {
connection.queryType = 'read';
this.poolError = null;
resolve(connection);
})
.catch(e => {
this.poolError = e;
});
});
},
destroy: connection => {
return this._disconnect(connection);
return this
._connect(config.replication.read[nextRead])
.tap(connection => {
connection.queryType = 'read';
})
.catch(err => err);
},
destroy: connection => this._disconnect(connection),
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
max: config.pool.max,
min: config.pool.min,
testOnBorrow: true,
autostart: false,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
}),
write: Pooling.createPool({
create: () => new Promise(resolve => {
this
create: () => {
return this
._connect(config.replication.write)
.then(connection => {
.tap(connection => {
connection.queryType = 'write';
this.poolError = null;
return resolve(connection);
})
.catch(e => {
this.poolError = e;
});
}),
destroy: connection => {
return this._disconnect(connection);
.catch(err => err);
},
destroy: connection => this._disconnect(connection),
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
max: config.pool.max,
min: config.pool.min,
testOnBorrow: true,
autostart: false,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
})
};
this.pool.read.on('factoryCreateError', error => {
this.poolError = error;
});
this.pool.write.on('factoryCreateError', error => {
this.poolError = error;
});
debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, with replication`);
}
/**
* Get connection from pool. It set database version if its not already set.
* Call pool.acquire to get a connection
*
* @param {Object} [options] Pool options
* @param {Integer} [options.priority] Set priority for this call. Read more at https://github.com/coopernurse/node-pool#priority-queueing
* @param {String} [options.type] Set which replica to use. Available options are `read` and `write`
* @param {Boolean} [options.useMaster=false] Force master or write replica to get connection from
*
* @return {Promise<Connection>}
*/
getConnection(options) {
options = options || {};
......@@ -248,64 +251,103 @@ class ConnectionManager {
if (this.versionPromise) {
promise = this.versionPromise;
} else {
promise = this.versionPromise = this._connect(this.config.replication.write || this.config).then(connection => {
const _options = {};
_options.transaction = {connection}; // Cheat .query to use our private connection
_options.logging = () => {};
_options.logging.__testLoggingFn = true;
return this.sequelize.databaseVersion(_options).then(version => {
this.sequelize.options.databaseVersion = semver.valid(version) ? version : this.defaultVersion;
this.versionPromise = null;
promise = this.versionPromise = this._connect(this.config.replication.write || this.config)
.then(connection => {
const _options = {};
_options.transaction = {connection}; // Cheat .query to use our private connection
_options.logging = () => {};
_options.logging.__testLoggingFn = true;
return this._disconnect(connection);
return this.sequelize.databaseVersion(_options).then(version => {
this.sequelize.options.databaseVersion = semver.valid(version) ? version : this.defaultVersion;
this.versionPromise = null;
return this._disconnect(connection);
});
}).catch(err => {
this.versionPromise = null;
throw err;
});
}).catch(err => {
this.versionPromise = null;
throw err;
});
}
} else {
promise = Promise.resolve();
}
return promise.then(() => {
return Promise.race([
this.pool.acquire(options.priority, options.type, options.useMaster),
new Promise((resolve, reject) =>
timers.setTimeout(() => {
if (this.poolError) {
reject(this.poolError);
}
}, 0))
])
.tap(() => { debug('connection acquired'); })
.catch(e => {
e = this.poolError || e;
this.poolError = null;
throw e;
});
return this.pool.acquire(options.priority, options.type, options.useMaster)
.then(mayBeConnection => this._determineConnection(mayBeConnection))
.tap(() => { debug('connection acquired'); });
});
}
/**
* Release a pooled connection so it can be utilized by other connection requests
*
* @param {Connection} connection
*
* @return {Promise}
*/
releaseConnection(connection) {
return this.pool.release(connection).tap(() => {
debug('connection released');
});
return this.pool.release(connection)
.tap(() => { debug('connection released'); })
.catch(/Resource not currently part of this pool/, () => {});
}
/**
* Check if something acquired by pool is indeed a connection but not an Error instance
* Why we need to do this https://github.com/sequelize/sequelize/pull/8330
*
* @param {Object|Error} mayBeConnection Object which can be either connection or error
*
* @retun {Promise<Connection>}
*/
_determineConnection(mayBeConnection) {
if (mayBeConnection instanceof Error) {
return Promise.resolve(this.pool.destroy(mayBeConnection))
.catch(/Resource not currently part of this pool/, () => {})
.then(() => { throw mayBeConnection; });
}
return Promise.resolve(mayBeConnection);
}
/**
* Call dialect library to get connection
*
* @param {*} config Connection config
* @private
* @return {Promise<Connection>}
*/
_connect(config) {
return this.sequelize.runHooks('beforeConnect', config)
.then(() => this.dialect.connectionManager.connect(config))
.then(connection => this.sequelize.runHooks('afterConnect', connection, config).return(connection));
}
/**
* Call dialect library to disconnect a connection
*
* @param {Connection} connection
* @private
* @return {Promise}
*/
_disconnect(connection) {
return this.dialect.connectionManager.disconnect(connection);
}
/**
* Determine if a connection is still valid or not
*
* @param {Connection} connection
*
* @return {Boolean}
*/
_validate(connection) {
if (!this.dialect.connectionManager.validate) return true;
if (!this.dialect.connectionManager.validate) {
return true;
}
return this.dialect.connectionManager.validate(connection);
}
}
......
......@@ -111,9 +111,9 @@ class ConnectionManager extends AbstractConnectionManager {
break;
}
});
if (config.dialectOptions && config.dialectOptions.debug) {
connection.on('debug', debugTedious);
connection.on('debug', debugTedious);
}
if (config.pool.handleDisconnects) {
......@@ -121,7 +121,8 @@ class ConnectionManager extends AbstractConnectionManager {
switch (err.code) {
case 'ESOCKET':
case 'ECONNRESET':
this.pool.destroy(connectionLock);
this.pool.destroy(connectionLock)
.catch(/Resource not currently part of this pool/, () => {});
}
});
}
......
......@@ -99,6 +99,8 @@ class ConnectionManager extends AbstractConnectionManager {
debug(`connection error ${e.code}`);
if (e.code === 'PROTOCOL_CONNECTION_LOST') {
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
return;
}
}
......
......@@ -35,7 +35,6 @@ class ConnectionManager extends AbstractConnectionManager {
// Expose this as a method so that the parsing may be updated when the user has added additional, custom types
_refreshTypeParser(dataType) {
if (dataType.types.postgres.oids) {
for (const oid of dataType.types.postgres.oids) {
this.lib.types.setTypeParser(oid, value => dataType.parse(value, oid, this.lib.types.getTypeParser));
......@@ -54,7 +53,6 @@ class ConnectionManager extends AbstractConnectionManager {
}
connect(config) {
config.user = config.username;
const connectionConfig = Utils._.pick(config, [
'user', 'password', 'host', 'database', 'port'
......@@ -155,7 +153,7 @@ class ConnectionManager extends AbstractConnectionManager {
return new Promise((resolve, reject) => connection.query(query, (error, result) => error ? reject(error) : resolve(result))).then(results => {
const result = Array.isArray(results) ? results.pop() : results;
for (const row of result.rows) {
let type;
if (row.typname === 'geometry') {
......@@ -174,6 +172,7 @@ class ConnectionManager extends AbstractConnectionManager {
});
});
}
disconnect(connection) {
return new Promise(resolve => {
connection.end();
......
......@@ -82,12 +82,12 @@ class Sequelize {
* @param {Object} [options.pool] sequelize connection pool configuration
* @param {Integer} [options.pool.max=5] Maximum number of connection in pool
* @param {Integer} [options.pool.min=0] Minimum number of connection in pool
* @param {Integer} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released
* @param {Integer} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released. Use with combination of evict for proper working, for more details read https://github.com/coopernurse/node-pool/issues/178#issuecomment-327110870
* @param {Integer} [options.pool.acquire=10000] The maximum time, in milliseconds, that pool will try to get connection before throwing error
* @param {Integer} [options.pool.evict=60000] The time interval, in milliseconds, for evicting stale connections. Set it to 0 to disable this feature.
* @param {Integer} [options.pool.evict=10000] The time interval, in milliseconds, for evicting stale connections. Set it to 0 to disable this feature.
* @param {Boolean} [options.pool.handleDisconnects=true] Controls if pool should handle connection disconnect automatically without throwing errors
* @param {Function} [options.pool.validate] A function that validates a connection. Called with client. The default function checks that client is an object, and that its state is not disconnected
* @param {Boolean} [options.quoteIdentifiers=true] Set to `false` to make table names and attributes case-insensitive on Postgres and skip double quoting of them. WARNING: Setting this to false may expose vulnerabilities and is not reccomended!
* @param {Boolean} [options.quoteIdentifiers=true] Set to `false` to make table names and attributes case-insensitive on Postgres and skip double quoting of them. WARNING: Setting this to false may expose vulnerabilities and is not recommended!
* @param {String} [options.transactionType='DEFERRED'] Set the default transaction type. See `Sequelize.Transaction.TYPES` for possible options. Sqlite only.
* @param {String} [options.isolationLevel] Set the default transaction isolation level. See `Sequelize.Transaction.ISOLATION_LEVELS` for possible options.
* @param {Object} [options.retry] Set of flags that control when a query is automatically retried.
......@@ -315,7 +315,7 @@ class Sequelize {
*
* sequelize.models.modelName // The model will now be available in models under the name given to define
*/
define(modelName, attributes, options) { // testhint options:none
define(modelName, attributes, options) {
options = options || {};
options.modelName = modelName;
......@@ -325,7 +325,6 @@ class Sequelize {
model.init(attributes, options);
return model;
}
......@@ -377,7 +376,7 @@ class Sequelize {
if (!this.importCache[path]) {
let defineCall = arguments.length > 1 ? arguments[1] : require(path);
if (typeof defineCall === 'object') {
// ES6 module compatability
// ES6 module compatibility
defineCall = defineCall.default;
}
this.importCache[path] = defineCall(this, DataTypes);
......@@ -655,8 +654,8 @@ class Sequelize {
* @param {RegExp} [options.match] Match a regex against the database name before syncing, a safety check for cases where force: true is used in tests but not live code
* @param {Boolean|function} [options.logging=console.log] A function that logs sql queries, or false for no logging
* @param {String} [options.schema='public'] The schema that the tables should be created in. This can be overriden for each table in sequelize.define
* @param {String} [options.searchPath=DEFAULT] An optional parameter to specify the schema search_path (Postgres only)
* @param {Boolean} [options.hooks=true] If hooks is true then beforeSync, afterSync, beforBulkSync, afterBulkSync hooks will be called
* @param {String} [options.searchPath=DEFAULT] An optional parameter to specify the schema search_path (Postgres only)
* @param {Boolean} [options.hooks=true] If hooks is true then beforeSync, afterSync, beforeBulkSync, afterBulkSync hooks will be called
* @param {Boolean} [options.alter=false] Alters tables to fit models. Not recommended for production use. Deletes data in columns that were removed or had their type changed in the model.
* @return {Promise}
*/
......@@ -950,7 +949,6 @@ class Sequelize {
autoCallback = options;
options = undefined;
}
// testhint argsConform.end
const transaction = new Transaction(this, options);
......@@ -1061,9 +1059,11 @@ class Sequelize {
*
* Normally this is done on process exit, so you only need to call this method if you are creating multiple instances, and want
* to garbage collect some of them.
*
* @return {Promise}
*/
close() {
this.connectionManager.close();
return this.connectionManager.close();
}
normalizeDataType(Type) {
......
......@@ -5,3 +5,7 @@ SEQ_MYSQL_PW=sequelize_test
SEQ_PG_PORT=8998
SEQ_PG_USER=sequelize_test
SEQ_PG_PW=sequelize_test
SEQ_MSSQL_PORT=8997
SEQ_MSSQL_DB=master
SEQ_MSSQL_USER=sa
SEQ_MSSQL_PW=yourStrong(!)Password
......@@ -26,7 +26,7 @@ module.exports = {
database: process.env.SEQ_MSSQL_DB || process.env.SEQ_DB || 'sequelize_test',
username: process.env.SEQ_MSSQL_USER || process.env.SEQ_USER || 'sequelize',
password: process.env.SEQ_MSSQL_PW || process.env.SEQ_PW || 'nEGkLma26gXVHFUAHJxcmsrK',
host: process.env.SEQ_MSSQL_HOST || process.env.SEQ_HOST || 'mssql.sequelizejs.com',
host: process.env.SEQ_MSSQL_HOST || process.env.SEQ_HOST || '127.0.0.1',
port: process.env.SEQ_MSSQL_PORT || process.env.SEQ_PORT || 1433,
dialectOptions: {
// big insert queries need a while
......
......@@ -18,7 +18,6 @@ const poolEntry = {
};
describe('Connection Manager', () => {
let sandbox;
beforeEach(() => {
......@@ -162,7 +161,6 @@ describe('Connection Manager', () => {
expect(poolDrainSpy.calledOnce).to.be.true;
expect(poolClearSpy.calledOnce).to.be.true;
});
});
});
......@@ -56,6 +56,30 @@ if (dialect.match(/^mssql/)) {
});
});
it('should not throw when non pooled connection is unexpectedly closed', () => {
const sequelize = Support.createSequelizeInstance({ pool: { min: 1, max: 1, idle: 5000 } });
const cm = sequelize.connectionManager;
let conn;
return sequelize
.sync()
.then(() => cm.getConnection())
.then(connection => {
conn = connection;
// remove from pool
return cm.pool.destroy(connection);
})
.then(() => {
// unexpected disconnect
const unwrapConn = conn.unwrap();
unwrapConn.emit('error', {
code: 'ESOCKET'
});
});
});
describe('Errors', () => {
it('ECONNREFUSED', () => {
const sequelize = Support.createSequelizeInstance({ port: 34237 });
......
......@@ -34,18 +34,20 @@ if (dialect === 'mysql') {
it('accepts new queries after shutting down a connection', () => {
// Create a sequelize instance with fast disconnecting connection
const sequelize = Support.createSequelizeInstance({ pool: { idle: 50, max: 1 }});
const sequelize = Support.createSequelizeInstance({ pool: { idle: 50, max: 1, evict: 10 }});
const User = sequelize.define('User', { username: DataTypes.STRING });
return User
.sync({force: true})
.then(() => User.create({username: 'user1'}))
.then(() => User.create({ username: 'user1' }))
.then(() => sequelize.Promise.delay(100))
.then(() => {
// This query will be queued just after the `client.end` is executed and before its callback is called
expect(sequelize.connectionManager.pool.size).to.equal(0);
//This query will be queued just after the `client.end` is executed and before its callback is called
return sequelize.query('SELECT COUNT(*) AS count FROM Users', { type: sequelize.QueryTypes.SELECT });
})
.then(count => {
expect(sequelize.connectionManager.pool.size).to.equal(1);
expect(count[0].count).to.equal(1);
});
});
......@@ -68,7 +70,7 @@ if (dialect === 'mysql') {
return cm.getConnection();
})
.then(connection => {
// Old threadId should be different from current new one
// Old threadId should be same as current connection
expect(conn.threadId).to.be.equal(connection.threadId);
expect(cm.validate(conn)).to.be.ok;
......@@ -77,7 +79,7 @@ if (dialect === 'mysql') {
});
it('should work with handleDisconnects before release', () => {
const sequelize = Support.createSequelizeInstance({pool: {min: 1, max: 1, handleDisconnects: true, idle: 5000}});
const sequelize = Support.createSequelizeInstance({pool: { max: 1, min: 1, handleDisconnects: true, idle: 5000 }});
const cm = sequelize.connectionManager;
let conn;
......@@ -89,8 +91,9 @@ if (dialect === 'mysql') {
conn = connection;
// simulate a unexpected end from MySQL2
conn.stream.emit('end');
return cm.releaseConnection(connection);
})
.then(() => cm.releaseConnection(conn))
.then(() => {
// Get next available connection
return cm.getConnection();
......@@ -98,7 +101,9 @@ if (dialect === 'mysql') {
.then(connection => {
// Old threadId should be different from current new one
expect(conn.threadId).to.not.be.equal(connection.threadId);
expect(cm.validate(conn)).to.not.be.ok;
expect(sequelize.connectionManager.pool.size).to.equal(1);
expect(cm.validate(conn)).to.be.not.ok;
return cm.releaseConnection(connection);
});
});
......
......@@ -57,26 +57,22 @@ describe(Support.getTestDialectTeaser('Replication'), function() {
it('should be able to make a write', () => {
return this.User.create({
firstName: Math.random().toString()
})
.then(expectWriteCalls);
}).then(expectWriteCalls);
});
it('should be able to make a read', () => {
return this.User.findAll()
.then(expectReadCalls);
return this.User.findAll().then(expectReadCalls);
});
it('should run read-only transactions on the replica', () => {
return this.sequelize.transaction({readOnly: true}, transaction => {
return this.User.findAll({transaction});
})
.then(expectReadCalls);
}).then(expectReadCalls);
});
it('should run non-read-only transactions on the primary', () => {
return this.sequelize.transaction(transaction => {
return this.User.findAll({transaction});
})
.then(expectWriteCalls);
}).then(expectWriteCalls);
});
});
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!