不要怂,就是干,撸起袖子干!

Commit 2a78517d by Sushant Committed by GitHub

Generic Pool Update (#7109)

* generic pool update and other fixes

* connection internally removed

* doc entry

* review changes

* pooling changes
1 parent ee8194e5
......@@ -37,6 +37,7 @@
- [FIXED] Enforce unique association aliases [#7025](https://github.com/sequelize/sequelize/pull/7025)
- [FIXED] Information warnings when findAll is given incorrect inputs [#7047](https://github.com/sequelize/sequelize/pull/7047)
- [FIXED] scope method syntax loses parameters when used multiple times [#7058](https://github.com/sequelize/sequelize/issues/7058)
- [INTERNAL] Updated to `generic-pool@3.1.6` [#7109](https://github.com/sequelize/sequelize/issues/7109)
## BC breaks:
- `DATEONLY` now returns string in `YYYY-MM-DD` format rather than `Date` type
......
......@@ -6,15 +6,17 @@ const _ = require('lodash');
const Utils = require('../../utils');
const debug = Utils.getLogger().debugContext('pool');
const semver = require('semver');
const timers = require('timers');
const defaultPoolingConfig = {
max: 5,
min: 0,
idle: 10000,
acquire: 10000,
handleDisconnects: true
};
class ConnectionManager {
constructor(dialect, sequelize) {
const config = _.cloneDeep(sequelize.config);
......@@ -22,24 +24,28 @@ class ConnectionManager {
this.config = config;
this.dialect = dialect;
this.versionPromise = null;
this.poolError = null;
this.dialectName = this.sequelize.options.dialect;
if (config.pool !== false) {
config.pool =_.defaults(config.pool || {}, defaultPoolingConfig, {
validate: this._validate.bind(this)
}) ;
} else {
if (config.pool === false) {
throw new Error('Support for pool:false was removed in v4.0');
}
config.pool =_.defaults(config.pool || {}, defaultPoolingConfig, {
validate: this._validate.bind(this),
Promise
}) ;
// Save a reference to the bound version so we can remove it with removeListener
this.onProcessExit = this.onProcessExit.bind(this);
process.on('exit', this.onProcessExit);
this.initPools();
}
refreshTypeParser(dataTypes) {
_.each(dataTypes, (dataType, key) => {
_.each(dataTypes, (dataType) => {
if (dataType.hasOwnProperty('parse')) {
if (dataType.types[this.dialectName]) {
this._refreshTypeParser(dataType);
......@@ -51,47 +57,68 @@ class ConnectionManager {
}
onProcessExit() {
if (this.pool) {
this.pool.drain(() => {
if (!this.pool) {
return Promise.resolve();
}
return this.pool.drain(() => {
debug('connection drain due to process exit');
this.pool.destroyAllNow();
return this.pool.clear();
});
}
}
close() {
this.onProcessExit();
process.removeListener('exit', this.onProcessExit); // Remove the listener, so all references to this instance can be garbage collected.
// Remove the listener, so all references to this instance can be garbage collected.
process.removeListener('exit', this.onProcessExit);
// Mark close of pool
this.getConnection = function getConnection() {
return Promise.reject(new Error('ConnectionManager.getConnection was called after the connection manager was closed!'));
};
return this.onProcessExit();
}
// This cannot happen in the constructor because the user can specify a min. number of connections to have in the pool
// If he does this, generic-pool will try to call connect before the dialect-specific connection manager has been correctly set up
initPools() {
const config = this.config;
if (!config.replication) {
this.pool = Pooling.Pool({
name: 'sequelize-connection',
create: (callback) => {
this._connect(config).nodeify((err, connection) => {
debug(`pool created max/min: ${config.pool.max}/${config.pool.min} with no replication`);
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
this.pool = Pooling.createPool({
create: () => new Promise((resolve) => {
this
._connect(config)
.tap(() => {
this.poolError = null;
})
.then(resolve)
.catch(e => {
// dont throw otherwise pool will release _dispense call
// which will call _connect even if error is fatal
// https://github.com/coopernurse/node-pool/issues/161
this.poolError = e;
});
},
}),
destroy: (connection) => {
return this._disconnect(connection).tap(() => {
debug('connection destroy');
this._disconnect(connection);
return null;
});
},
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
max: config.pool.max,
min: config.pool.min,
validate: config.pool.validate,
testOnBorrow: true,
autostart: false,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle
});
this.pool.on('factoryCreateError', error => {
this.poolError = error;
});
debug(`pool created max/min: ${config.pool.max}/${config.pool.min} with no replication`);
return;
}
......@@ -109,7 +136,7 @@ class ConnectionManager {
_.defaults(readConfig, _.omit(this.config, 'replication'))
);
// I'll make my own pool, with blackjack and hookers! (original credit goes to @janzeh)
// custom pooling for replication (original author @janmeier)
this.pool = {
release: client => {
if (client.queryType === 'read') {
......@@ -118,67 +145,95 @@ class ConnectionManager {
return this.pool.write.release(client);
}
},
acquire: (callback, priority, queryType, useMaster) => {
acquire: (priority, queryType, useMaster) => {
useMaster = _.isUndefined(useMaster) ? false : useMaster;
if (queryType === 'SELECT' && !useMaster) {
this.pool.read.acquire(callback, priority);
return this.pool.read.acquire(priority);
} else {
this.pool.write.acquire(callback, priority);
return this.pool.write.acquire(priority);
}
},
destroy: (connection) => {
destroy: connection => {
debug('connection destroy');
return this.pool[connection.queryType].destroy(connection);
},
destroyAllNow: () => {
debug('all connection destroy');
this.pool.read.destroyAllNow();
this.pool.write.destroyAllNow();
clear: () => {
debug('all connection clear');
return Promise.join(
this.pool.read.clear(),
this.pool.write.clear()
);
},
drain: (cb) => {
this.pool.write.drain(() => {
this.pool.read.drain(cb);
});
drain: () => {
return Promise.join(
this.pool.write.drain(),
this.pool.read.drain()
);
},
read: Pooling.Pool({
name: 'sequelize-connection-read',
create: (callback) => {
// Simple round robin config
const nextRead = reads++ % config.replication.read.length;
this._connect(config.replication.read[nextRead]).tap(connection => {
read: Pooling.createPool({
create: () => {
const nextRead = reads++ % config.replication.read.length; // round robin config
return new Promise((resolve) => {
this
._connect(config.replication.read[nextRead])
.tap(connection => {
connection.queryType = 'read';
}).nodeify((err, connection) => {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
this.poolError = null;
resolve(connection);
})
.catch(e => {
this.poolError = e;
});
});
},
destroy: connection => {
this._disconnect(connection);
return null;
return this._disconnect(connection);
},
validate: config.pool.validate,
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
max: config.pool.max,
min: config.pool.min,
testOnBorrow: true,
autostart: false,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle
}),
write: Pooling.Pool({
name: 'sequelize-connection-write',
create: callback => {
this._connect(config.replication.write).tap(connection => {
write: Pooling.createPool({
create: () => new Promise((resolve) => {
this
._connect(config.replication.write)
.then(connection => {
connection.queryType = 'write';
}).nodeify((err, connection) => {
callback(err, connection); // For some reason this is needed, else generic-pool things err is a connection or some shit
this.poolError = null;
return resolve(connection);
})
.catch(e => {
this.poolError = e;
});
},
}),
destroy: connection => {
this._disconnect(connection);
return null;
return this._disconnect(connection);
},
validate: config.pool.validate,
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
max: config.pool.max,
min: config.pool.min,
testOnBorrow: true,
autostart: false,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle
})
};
this.pool.read.on('factoryCreateError', error => {
this.poolError = error;
});
this.pool.write.on('factoryCreateError', error => {
this.poolError = error;
});
}
getConnection(options) {
......@@ -197,11 +252,9 @@ class ConnectionManager {
return this.sequelize.databaseVersion(_options).then(version => {
this.sequelize.options.databaseVersion = semver.valid(version) ? version : this.defaultVersion;
this.versionPromise = null;
this._disconnect(connection);
return null;
return this._disconnect(connection);
});
}).catch(err => {
this.versionPromise = null;
......@@ -212,20 +265,36 @@ class ConnectionManager {
promise = Promise.resolve();
}
return promise.then(() => new Promise((resolve, reject) => {
this.pool.acquire((err, connection) => {
if (err) return reject(err);
debug('connection acquired');
resolve(connection);
}, options.priority, options.type, options.useMaster);
}));
return promise.then(() =>
new Promise((resolve, reject) => {
const connectionPromise = this.pool.acquire(options.priority, options.type, options.useMaster);
const connectionTimer = timers.setInterval(() => {
let evictTimer = false;
if (connectionPromise.isFulfilled()) {
resolve(connectionPromise);
debug('connection acquire');
evictTimer = true;
} else if (this.poolError) {
reject(this.poolError);
this.poolError = null;
evictTimer = true;
} else if (connectionPromise.isRejected()) {
connectionPromise.catch(reject);
evictTimer = true;
}
if (evictTimer) {
timers.clearInterval(connectionTimer);
}
}, 0);
})
);
}
releaseConnection(connection) {
return new Promise((resolve, reject) => {
this.pool.release(connection);
return this.pool.release(connection).tap(() => {
debug('connection released');
resolve();
});
}
......
......@@ -130,7 +130,7 @@ class ConnectionManager extends AbstractConnectionManager {
const connection = connectionLock.unwrap();
// Dont disconnect a connection that is already disconnected
if (!!connection.closed) {
if (connection.closed) {
return Promise.resolve();
}
......@@ -143,7 +143,6 @@ class ConnectionManager extends AbstractConnectionManager {
validate(connectionLock) {
const connection = connectionLock.unwrap();
return connection && connection.loggedIn;
}
}
......
'use strict';
var _ = require('lodash')
, Abstract = require('../abstract')
, ConnectionManager = require('./connection-manager')
, Query = require('./query')
, QueryGenerator = require('./query-generator')
, DataTypes = require('../../data-types').mssql;
const _ = require('lodash');
const AbstractDialect = require('../abstract');
const ConnectionManager = require('./connection-manager');
const Query = require('./query');
const QueryGenerator = require('./query-generator');
const DataTypes = require('../../data-types').mssql;
var MssqlDialect = function(sequelize) {
class MssqlDialect extends AbstractDialect {
constructor(sequelize) {
super();
this.sequelize = sequelize;
this.connectionManager = new ConnectionManager(this, sequelize);
this.connectionManager.initPools();
this.QueryGenerator = _.extend({}, QueryGenerator, {
options: sequelize.options,
_dialect: this,
sequelize
});
};
}
}
MssqlDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), {
MssqlDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototype.supports), {
'DEFAULT': true,
'DEFAULT VALUES': true,
'LIMIT ON UPDATE': true,
......@@ -51,7 +53,7 @@ MssqlDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.support
tmpTableTrigger: true
});
MssqlDialect.prototype.defaultVersion = '12.0.2000'; // SQL Server 2014 Express
ConnectionManager.prototype.defaultVersion = '12.0.2000'; // SQL Server 2014 Express
MssqlDialect.prototype.Query = Query;
MssqlDialect.prototype.name = 'mssql';
MssqlDialect.prototype.TICK_CHAR = '"';
......
'use strict';
var _ = require('lodash')
, Abstract = require('../abstract')
, ConnectionManager = require('./connection-manager')
, Query = require('./query')
, QueryGenerator = require('./query-generator')
, DataTypes = require('../../data-types').mysql;
const _ = require('lodash');
const AbstractDialect = require('../abstract');
const ConnectionManager = require('./connection-manager');
const Query = require('./query');
const QueryGenerator = require('./query-generator');
const DataTypes = require('../../data-types').mysql;
var MysqlDialect = function(sequelize) {
class MysqlDialect extends AbstractDialect {
constructor(sequelize) {
super();
this.sequelize = sequelize;
this.connectionManager = new ConnectionManager(this, sequelize);
this.connectionManager.initPools();
this.QueryGenerator = _.extend({}, QueryGenerator, {
options: sequelize.options,
_dialect: this,
sequelize
});
};
}
}
MysqlDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), {
MysqlDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototype.supports), {
'VALUES ()': true,
'LIMIT ON UPDATE': true,
'IGNORE': ' IGNORE',
......
......@@ -45,8 +45,8 @@ class ConnectionManager extends AbstractConnectionManager {
if (dataType.types.postgres.array_oids) {
for (const oid of dataType.types.postgres.array_oids) {
this.lib.types.setTypeParser(oid, value =>
this.lib.types.arrayParser.create(value, value =>
dataType.parse(value, oid, this.lib.types.getTypeParser)
this.lib.types.arrayParser.create(value, (v) =>
dataType.parse(v, oid, this.lib.types.getTypeParser)
).parse()
);
}
......@@ -172,7 +172,7 @@ class ConnectionManager extends AbstractConnectionManager {
});
}
disconnect(connection) {
return new Promise((resolve, reject) => {
return new Promise(resolve => {
connection.end();
resolve();
});
......
......@@ -12,8 +12,6 @@ class PostgresDialect extends AbstractDialect {
super();
this.sequelize = sequelize;
this.connectionManager = new ConnectionManager(this, sequelize);
this.connectionManager.initPools();
this.QueryGenerator = _.extend({}, QueryGenerator, {
options: sequelize.options,
_dialect: this,
......
......@@ -84,7 +84,8 @@ const _ = require('lodash');
* @param {Integer} [options.pool.max] Maximum number of connection in pool. Default is 5
* @param {Integer} [options.pool.min] Minimum number of connection in pool. Default is 0
* @param {Integer} [options.pool.idle] The maximum time, in milliseconds, that a connection can be idle before being released
* @param {Function} [options.pool.validateConnection] A function that validates a connection. Called with client. The default function checks that client is an object, and that its state is not disconnected
* @param {Integer} [options.pool.acquire] The maximum time, in milliseconds, that pool will try to get connection before throwing error
* @param {Function} [options.pool.validate] A function that validates a connection. Called with client. The default function checks that client is an object, and that its state is not disconnected
* @param {Boolean} [options.quoteIdentifiers=true] Set to `false` to make table names and attributes case-insensitive on Postgres and skip double quoting of them.
* @param {String} [options.transactionType='DEFERRED'] Set the default transaction type. See `Sequelize.Transaction.TYPES` for possible options. Sqlite only.
* @param {String} [options.isolationLevel] Set the default transaction isolation level. See `Sequelize.Transaction.ISOLATION_LEVELS` for possible options.
......@@ -265,10 +266,6 @@ class Sequelize {
Sequelize.runHooks('afterInit', this);
}
get connectorManager() {
return this.transactionManager.getConnectorManager();
}
refreshTypes() {
this.connectionManager.refreshTypeParser(DataTypes);
}
......
......@@ -42,7 +42,7 @@
"depd": "^1.1.0",
"dottie": "^2.0.0",
"env-cmd": "^4.0.0",
"generic-pool": "^2.4.4",
"generic-pool": "^3.1.6",
"inflection": "1.10.0",
"lodash": "^4.17.1",
"moment": "^2.13.0",
......
......@@ -18,7 +18,7 @@ var poolEntry = {
pool: {}
};
describe('Connction Manager', function() {
describe('Connection Manager', function() {
var sandbox;
......@@ -37,7 +37,7 @@ describe('Connction Manager', function() {
var sequelize = Support.createSequelizeInstance(options);
var connectionManager = new ConnectionManager(Support.getTestDialect(), sequelize);
var poolSpy = sandbox.spy(Pooling, 'Pool');
var poolSpy = sandbox.spy(Pooling, 'createPool');
connectionManager.initPools();
expect(poolSpy.calledOnce).to.be.true;
});
......@@ -52,7 +52,7 @@ describe('Connction Manager', function() {
var sequelize = Support.createSequelizeInstance(options);
var connectionManager = new ConnectionManager(Support.getTestDialect(), sequelize);
var poolSpy = sandbox.spy(Pooling, 'Pool');
var poolSpy = sandbox.spy(Pooling, 'createPool');
connectionManager.initPools();
expect(poolSpy.calledTwice).to.be.true;
});
......
......@@ -22,10 +22,8 @@ if (dialect.match(/^mssql/)) {
conn = connection;
// simulate a unexpected end
connection.unwrap().emit('error', {code: 'ECONNRESET'});
})
.then(function() {
return cm.releaseConnection(conn);
// connection removed from pool by MSSQL Conn Manager
conn.unwrap().emit('error', {code: 'ECONNRESET'});
})
.then(function() {
// Get next available connection
......
......@@ -9,7 +9,7 @@ var chai = require('chai')
, DataTypes = require(__dirname + '/../../../../lib/data-types');
if (dialect === 'mysql') {
describe('[MYSQL Specific] Connector Manager', function() {
describe('[MYSQL Specific] Connection Manager', function() {
it('works correctly after being idle', function() {
var User = this.sequelize.define('User', { username: DataTypes.STRING })
, spy = sinon.spy()
......
'use strict';
/* jshint -W030 */
/* jshint -W110 */
const chai = require('chai');
const expect = chai.expect;
const Support = require(__dirname + '/support');
const dialect = Support.getTestDialect();
const sinon = require('sinon');
const Sequelize = Support.Sequelize;
describe(Support.getTestDialectTeaser('Pooling'), function() {
if (dialect === 'sqlite') return;
beforeEach(() => {
this.sinon = sinon.sandbox.create();
});
afterEach(() => {
this.sinon.restore();
});
it('should reject when unable to acquire connection in given time', () => {
this.testInstance = new Sequelize('localhost', 'ffd', 'dfdf', {
dialect,
databaseVersion: '1.2.3',
pool: {
acquire: 1000 //milliseconds
}
});
this.sinon.stub(this.testInstance.connectionManager, '_connect', () => new Sequelize.Promise(() => {}));
return expect(this.testInstance.authenticate())
.to.eventually.be.rejectedWith('TimeoutError: ResourceRequest timed out');
});
});
......@@ -2,16 +2,16 @@
/* jshint -W030 */
/* jshint -W110 */
var chai = require('chai')
, expect = chai.expect
, Support = require(__dirname + '/support')
, DataTypes = require(__dirname + '/../../lib/data-types')
, dialect = Support.getTestDialect();
const chai = require('chai');
const expect = chai.expect;
const Support = require(__dirname + '/support');
const DataTypes = require(__dirname + '/../../lib/data-types');
const dialect = Support.getTestDialect();
describe(Support.getTestDialectTeaser('Replication'), function() {
if (dialect === 'sqlite') return;
beforeEach(function () {
beforeEach(() => {
this.sequelize = Support.getSequelizeInstance(null, null, null, {
replication: {
write: Support.getConnectionOptions(),
......@@ -32,13 +32,13 @@ describe(Support.getTestDialectTeaser('Replication'), function() {
return this.User.sync({force: true});
});
it('should be able to make a write', function () {
it('should be able to make a write', () => {
return this.User.create({
firstName: Math.random().toString()
});
});
it('should be able to make a read', function () {
it('should be able to make a read', () => {
return this.User.findAll();
});
});
'use strict';
/* jshint -W030 */
var chai = require('chai')
, expect = chai.expect
, sinon = require('sinon')
, Support = require(__dirname + '/support')
, Sequelize = Support.Sequelize
, dialect = Support.getTestDialect()
, current = Support.sequelize
, Promise = Sequelize.Promise;
const chai = require('chai');
const expect = chai.expect;
const sinon = require('sinon');
const Support = require(__dirname + '/support');
const Sequelize = Support.Sequelize;
const dialect = Support.getTestDialect();
const current = Support.sequelize;
describe('Transaction', function() {
before(function () {
this.stub = sinon.stub(current, 'query').returns(Promise.resolve({}));
before(() => {
this.stub = sinon.stub(current, 'query').returns(Sequelize.Promise.resolve({}));
this.stubConnection = sinon.stub(current.connectionManager, 'getConnection')
.returns(Promise.resolve({ uuid: 'ssfdjd-434fd-43dfg23-2d', close : function() { }}));
.returns(Sequelize.Promise.resolve({
uuid: 'ssfdjd-434fd-43dfg23-2d',
close(){}
}));
this.stubRelease = sinon.stub(current.connectionManager, 'releaseConnection')
.returns(Sequelize.Promise.resolve());
});
beforeEach(function () {
beforeEach(() => {
this.stub.reset();
this.stubConnection.reset();
this.stubRelease.reset();
});
after(function () {
after(() => {
this.stub.restore();
this.stubConnection.restore();
});
it('should run auto commit query only when needed', function() {
var expectations = {
it('should run auto commit query only when needed', () => {
const expectations = {
all: [
'START TRANSACTION;'
],
......@@ -43,7 +48,7 @@ describe('Transaction', function() {
};
return current.transaction(() => {
expect(this.stub.args.map(arg => arg[0])).to.deep.equal(expectations[dialect] || expectations.all);
return Promise.resolve();
return Sequelize.Promise.resolve();
});
});
});
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!