不要怂,就是干,撸起袖子干!

Commit 567c0198 by Sushant Committed by GitHub

refactor: use sequelize-pool for pooling (#10051)

1 parent 88a340da
......@@ -9,6 +9,7 @@ changelog.md
Makefile
coverage*
.github
.vscode
appveyor-setup.ps1
appveyor.yml
......
'use strict';
const Pooling = require('generic-pool');
const { Pool } = require('sequelize-pool');
const _ = require('lodash');
const semver = require('semver');
const Promise = require('../../promise');
......@@ -8,19 +8,11 @@ const errors = require('../../errors');
const logger = require('../../utils/logger');
const debug = logger.getLogger().debugContext('pool');
const defaultPoolingConfig = {
max: 5,
min: 0,
idle: 10000,
acquire: 60000,
evict: 1000
};
/**
* Abstract Connection Manager
*
* Connection manager which handles pool, replication and determining database version
* Works with generic-pool to maintain connection pool
* Connection manager which handles pooling & replication.
* Uses sequelize-pool for pooling
*
* @private
*/
......@@ -38,9 +30,13 @@ class ConnectionManager {
throw new Error('Support for pool:false was removed in v4.0');
}
config.pool = _.defaults(config.pool || {}, defaultPoolingConfig, {
validate: this._validate.bind(this),
Promise
config.pool = _.defaults(config.pool || {}, {
max: 5,
min: 0,
idle: 10000,
acquire: 60000,
evict: 1000,
validate: this._validate.bind(this)
});
this.initPools();
......@@ -102,7 +98,7 @@ class ConnectionManager {
return this.pool.drain().then(() => {
debug('connection drain due to process exit');
return this.pool.clear();
return this.pool.destroyAllNow();
});
}
......@@ -128,26 +124,19 @@ class ConnectionManager {
const config = this.config;
if (!config.replication) {
this.pool = Pooling.createPool({
create: () => this._connect(config).catch(err => err),
destroy: mayBeConnection => {
if (mayBeConnection instanceof Error) {
return Promise.resolve();
}
return this._disconnect(mayBeConnection)
this.pool = new Pool({
name: 'sequelize',
create: () => this._connect(config),
destroy: connection => {
return this._disconnect(connection)
.tap(() => { debug('connection destroy'); });
},
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
testOnBorrow: true,
autostart: false,
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
reapIntervalMillis: config.pool.evict
});
debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, no replication`);
......@@ -155,8 +144,6 @@ class ConnectionManager {
return;
}
let reads = 0;
if (!Array.isArray(config.replication.read)) {
config.replication.read = [config.replication.read];
}
......@@ -170,35 +157,32 @@ class ConnectionManager {
);
// custom pooling for replication (original author @janmeier)
let reads = 0;
this.pool = {
release: client => {
if (client.queryType === 'read') {
return this.pool.read.release(client);
this.pool.read.release(client);
} else {
return this.pool.write.release(client);
this.pool.write.release(client);
}
},
acquire: (priority, queryType, useMaster) => {
acquire: (queryType, useMaster) => {
useMaster = _.isUndefined(useMaster) ? false : useMaster;
if (queryType === 'SELECT' && !useMaster) {
return this.pool.read.acquire(priority);
return this.pool.read.acquire();
} else {
return this.pool.write.acquire(priority);
return this.pool.write.acquire();
}
},
destroy: mayBeConnection => {
if (mayBeConnection instanceof Error) {
return Promise.resolve();
}
return this.pool[mayBeConnection.queryType].destroy(mayBeConnection)
.tap(() => { debug('connection destroy'); });
destroy: connection => {
this.pool[connection.queryType].destroy(connection);
debug('connection destroy');
},
clear: () => {
destroyAllNow: () => {
return Promise.join(
this.pool.read.clear(),
this.pool.write.clear()
).tap(() => { debug('all connection clear'); });
this.pool.read.destroyAllNow(),
this.pool.write.destroyAllNow()
).tap(() => { debug('all connections destroyed'); });
},
drain: () => {
return Promise.join(
......@@ -206,48 +190,37 @@ class ConnectionManager {
this.pool.read.drain()
);
},
read: Pooling.createPool({
read: new Pool({
name: 'sequelize:read',
create: () => {
const nextRead = reads++ % config.replication.read.length; // round robin config
return this
._connect(config.replication.read[nextRead])
.tap(connection => {
// round robin config
const nextRead = reads++ % config.replication.read.length;
return this._connect(config.replication.read[nextRead]).tap(connection => {
connection.queryType = 'read';
})
.catch(err => err);
});
},
destroy: connection => this._disconnect(connection),
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
testOnBorrow: true,
autostart: false,
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
reapIntervalMillis: config.pool.evict
}),
write: Pooling.createPool({
write: new Pool({
name: 'sequelize:write',
create: () => {
return this
._connect(config.replication.write)
.tap(connection => {
return this._connect(config.replication.write).tap(connection => {
connection.queryType = 'write';
})
.catch(err => err);
});
},
destroy: connection => this._disconnect(connection),
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
testOnBorrow: true,
autostart: false,
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
reapIntervalMillis: config.pool.evict
})
};
......@@ -259,7 +232,6 @@ class ConnectionManager {
* Call pool.acquire to get a connection
*
* @param {Object} [options] Pool options
* @param {Integer} [options.priority] Set priority for this call. Read more at https://github.com/coopernurse/node-pool#priority-queueing
* @param {string} [options.type] Set which replica to use. Available options are `read` and `write`
* @param {boolean} [options.useMaster=false] Force master or write replica to get connection from
*
......@@ -297,9 +269,8 @@ class ConnectionManager {
}
return promise.then(() => {
return this.pool.acquire(options.priority, options.type, options.useMaster)
.then(mayBeConnection => this._determineConnection(mayBeConnection))
.catch({name: 'TimeoutError'}, err => { throw new errors.ConnectionAcquireTimeoutError(err); })
return this.pool.acquire(options.type, options.useMaster)
.catch(Promise.TimeoutError, err => { throw new errors.ConnectionAcquireTimeoutError(err); })
.tap(() => { debug('connection acquired'); });
});
}
......@@ -312,27 +283,10 @@ class ConnectionManager {
* @returns {Promise}
*/
releaseConnection(connection) {
return this.pool.release(connection)
.tap(() => { debug('connection released'); })
.catch(/Resource not currently part of this pool/, () => {});
}
/**
* Check if something acquired by pool is indeed a connection but not an Error instance
* Why we need to do this https://github.com/sequelize/sequelize/pull/8330
*
* @param {Object|Error} mayBeConnection Object which can be either connection or error
*
* @returns {Promise<Connection>}
*/
_determineConnection(mayBeConnection) {
if (mayBeConnection instanceof Error) {
return Promise.resolve(this.pool.destroy(mayBeConnection))
.catch(/Resource not currently part of this pool/, () => {})
.then(() => { throw mayBeConnection; });
}
return Promise.resolve(mayBeConnection);
return Promise.try(() => {
this.pool.release(connection);
debug('connection released');
});
}
/**
......
......@@ -94,8 +94,7 @@ class ConnectionManager extends AbstractConnectionManager {
switch (error.code) {
case 'ESOCKET':
case 'ECONNRESET':
this.pool.destroy(resourceLock)
.catch(/Resource not currently part of this pool/, () => {});
this.pool.destroy(resourceLock);
}
});
}).catch(error => {
......
......@@ -2,7 +2,7 @@
const AbstractConnectionManager = require('../abstract/connection-manager');
const SequelizeErrors = require('../../errors');
const Utils = require('../../utils');
const Promise = require('../../promise');
const logger = require('../../utils/logger');
const DataTypes = require('../../data-types').mysql;
const momentTz = require('moment-timezone');
......@@ -72,7 +72,7 @@ class ConnectionManager extends AbstractConnectionManager {
}
}
return new Utils.Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
const connection = this.lib.createConnection(connectionConfig);
const errorHandler = e => {
......@@ -103,12 +103,11 @@ class ConnectionManager extends AbstractConnectionManager {
case 'ECONNRESET':
case 'EPIPE':
case 'PROTOCOL_CONNECTION_LOST':
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
this.pool.destroy(connection);
}
});
return new Utils.Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
if (!this.sequelize.config.keepDefaultTimezone) {
// set timezone for this connection
// but named timezone are not directly supported in mysql, so get its offset first
......@@ -145,19 +144,10 @@ class ConnectionManager extends AbstractConnectionManager {
// Don't disconnect connections with CLOSED state
if (connection._closing) {
debug('connection tried to disconnect but was already at CLOSED state');
return Utils.Promise.resolve();
return Promise.resolve();
}
return new Utils.Promise((resolve, reject) => {
connection.end(err => {
if (err) {
reject(new SequelizeErrors.ConnectionError(err));
} else {
debug('connection disconnected');
resolve();
}
});
});
return Promise.fromCallback(callback => connection.end(callback));
}
validate(connection) {
......
......@@ -176,15 +176,18 @@ class ConnectionManager extends AbstractConnectionManager {
// Don't let a Postgres restart (or error) to take down the whole app
connection.on('error', error => {
connection._invalid = true;
debug(`connection error ${error.code}`);
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
debug(`connection error ${error.code || error.message}`);
this.pool.destroy(connection);
});
});
}
disconnect(connection) {
if (connection._ending) {
debug('connection tried to disconnect but was already at ENDING state');
return Promise.resolve();
}
return Promise.fromCallback(callback => connection.end(callback));
}
......
......@@ -78,10 +78,9 @@ class Sequelize {
* @param {Object} [options.pool] sequelize connection pool configuration
* @param {number} [options.pool.max=5] Maximum number of connection in pool
* @param {number} [options.pool.min=0] Minimum number of connection in pool
* @param {number} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released. Use with combination of evict for proper working, for more details read https://github.com/coopernurse/node-pool/issues/178#issuecomment-327110870
* @param {number} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released.
* @param {number} [options.pool.acquire=60000] The maximum time, in milliseconds, that pool will try to get connection before throwing error
* @param {number} [options.pool.evict=1000] The time interval, in milliseconds, for evicting stale connections. Set it to 0 to disable this feature.
* @param {boolean} [options.pool.handleDisconnects=true] Controls if pool should handle connection disconnect automatically without throwing errors
* @param {number} [options.pool.evict=1000] The time interval, in milliseconds, after which sequelize-pool will remove idle connections.
* @param {Function} [options.pool.validate] A function that validates a connection. Called with client. The default function checks that client is an object, and that its state is not disconnected
* @param {boolean} [options.quoteIdentifiers=true] Set to `false` to make table names and attributes case-insensitive on Postgres and skip double quoting of them. WARNING: Setting this to false may expose vulnerabilities and is not recommended!
* @param {string} [options.transactionType='DEFERRED'] Set the default transaction type. See `Sequelize.Transaction.TYPES` for possible options. Sqlite only.
......
......@@ -28,13 +28,13 @@
"debug": "^4.1.0",
"depd": "^2.0.0",
"dottie": "^2.0.0",
"generic-pool": "^3.4.0",
"inflection": "1.12.0",
"lodash": "^4.17.11",
"moment": "^2.22.2",
"moment-timezone": "^0.5.21",
"retry-as-promised": "^3.1.0",
"semver": "^5.6.0",
"sequelize-pool": "^1.0.0",
"toposort-class": "^1.0.1",
"uuid": "^3.2.1",
"validator": "^10.4.0",
......
......@@ -6,7 +6,7 @@ const chai = require('chai'),
sinon = require('sinon'),
Config = require('../../../config/config'),
ConnectionManager = require('../../../../lib/dialects/abstract/connection-manager'),
Pooling = require('generic-pool'),
Pool = require('sequelize-pool').Pool,
_ = require('lodash'),
Promise = require('../../../../lib/promise');
......@@ -35,9 +35,10 @@ describe('Connection Manager', () => {
const sequelize = Support.createSequelizeInstance(options);
const connectionManager = new ConnectionManager(Support.getTestDialect(), sequelize);
const poolSpy = sandbox.spy(Pooling, 'createPool');
connectionManager.initPools();
expect(poolSpy.calledOnce).to.be.true;
expect(connectionManager.pool).to.be.instanceOf(Pool);
expect(connectionManager.pool.read).to.be.undefined;
expect(connectionManager.pool.write).to.be.undefined;
});
it('should initialize a multiple pools with replication', () => {
......@@ -50,9 +51,9 @@ describe('Connection Manager', () => {
const sequelize = Support.createSequelizeInstance(options);
const connectionManager = new ConnectionManager(Support.getTestDialect(), sequelize);
const poolSpy = sandbox.spy(Pooling, 'createPool');
connectionManager.initPools();
expect(poolSpy.calledTwice).to.be.true;
expect(connectionManager.pool.read).to.be.instanceOf(Pool);
expect(connectionManager.pool.write).to.be.instanceOf(Pool);
});
it('should round robin calls to the read pool', () => {
......@@ -155,7 +156,7 @@ describe('Connection Manager', () => {
connectionManager.initPools();
const poolDrainSpy = sandbox.spy(connectionManager.pool, 'drain');
const poolClearSpy = sandbox.spy(connectionManager.pool, 'clear');
const poolClearSpy = sandbox.spy(connectionManager.pool, 'destroyAllNow');
return connectionManager.close().then(() => {
expect(poolDrainSpy.calledOnce).to.be.true;
......
......@@ -71,7 +71,6 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
const sequelize = Support.createSequelizeInstance({
pool: {
max: 1,
min: 1,
idle: 5000
}
});
......@@ -114,7 +113,6 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
const sequelize = Support.createSequelizeInstance({
pool: {
max: 1,
min: 1,
idle: 5000
}
});
......@@ -165,7 +163,6 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
it('should maintain connection within idle range', () => {
const sequelize = Support.createSequelizeInstance({
pool: {
min: 1,
max: 1,
idle: 10
}
......@@ -202,9 +199,8 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
it('should get new connection beyond idle range', () => {
const sequelize = Support.createSequelizeInstance({
pool: {
min: 1,
max: 1,
idle: 1,
idle: 100,
evict: 10
}
});
......@@ -227,7 +223,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
})
.then(() => {
// Get next available connection
return Sequelize.Promise.delay(30).then(() => cm.getConnection());
return Sequelize.Promise.delay(110).then(() => cm.getConnection());
})
.then(connection => {
assertNewConnection(connection, conn);
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!