不要怂,就是干,撸起袖子干!

Commit ce5bc374 by Andy Edwards Committed by GitHub

refactor(mssql/connection-manager): eliminate using/disposer calls (#12136)

1 parent e59b3ff2
'use strict';
const BaseError = require('../../errors/base-error');
const ConnectionError = require('../../errors/connection-error');
/**
* Thrown when a connection to a database is closed while an operation is in progress
*/
class AsyncQueueError extends BaseError {
constructor(message) {
super(message);
this.name = 'SequelizeAsyncQueueError';
}
}
exports.AsyncQueueError = AsyncQueueError;
class AsyncQueue {
constructor() {
this.previous = Promise.resolve();
this.closed = false;
this.rejectCurrent = () => {};
}
close() {
this.closed = true;
this.rejectCurrent(new ConnectionError(new AsyncQueueError('the connection was closed before this query could finish executing')));
}
enqueue(asyncFunction) {
// This outer promise might seems superflous since down below we return asyncFunction().then(resolve, reject).
// However, this ensures that this.previous will never be a rejected promise so the queue will
// always keep going, while still communicating rejection from asyncFunction to the user.
return new Promise((resolve, reject) => {
this.previous = this.previous.then(
() => {
this.rejectCurrent = reject;
if (this.closed) {
return reject(new ConnectionError(new AsyncQueueError('the connection was closed before this query could be executed')));
}
return asyncFunction().then(resolve, reject);
}
);
});
}
}
exports.default = AsyncQueue;
'use strict'; 'use strict';
const AbstractConnectionManager = require('../abstract/connection-manager'); const AbstractConnectionManager = require('../abstract/connection-manager');
const ResourceLock = require('./resource-lock'); const AsyncQueue = require('./async-queue').default;
const Promise = require('../../promise'); const Promise = require('../../promise');
const { logger } = require('../../utils/logger'); const { logger } = require('../../utils/logger');
const sequelizeErrors = require('../../errors'); const sequelizeErrors = require('../../errors');
...@@ -61,8 +61,8 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -61,8 +61,8 @@ class ConnectionManager extends AbstractConnectionManager {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const connection = new this.lib.Connection(connectionConfig); const connection = new this.lib.Connection(connectionConfig);
connection.queue = new AsyncQueue();
connection.lib = this.lib; connection.lib = this.lib;
const resourceLock = new ResourceLock(connection);
const connectHandler = error => { const connectHandler = error => {
connection.removeListener('end', endHandler); connection.removeListener('end', endHandler);
...@@ -71,7 +71,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -71,7 +71,7 @@ class ConnectionManager extends AbstractConnectionManager {
if (error) return reject(error); if (error) return reject(error);
debug('connection acquired'); debug('connection acquired');
resolve(resourceLock); resolve(connection);
}; };
const endHandler = () => { const endHandler = () => {
...@@ -102,7 +102,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -102,7 +102,7 @@ class ConnectionManager extends AbstractConnectionManager {
switch (error.code) { switch (error.code) {
case 'ESOCKET': case 'ESOCKET':
case 'ECONNRESET': case 'ECONNRESET':
this.pool.destroy(resourceLock); this.pool.destroy(connection);
} }
}); });
...@@ -143,19 +143,14 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -143,19 +143,14 @@ class ConnectionManager extends AbstractConnectionManager {
}); });
} }
disconnect(connectionLock) { disconnect(connection) {
/**
* Abstract connection may try to disconnect raw connection used for fetching version
*/
const connection = connectionLock.unwrap
? connectionLock.unwrap()
: connectionLock;
// Don't disconnect a connection that is already disconnected // Don't disconnect a connection that is already disconnected
if (connection.closed) { if (connection.closed) {
return Promise.resolve(); return Promise.resolve();
} }
connection.queue.close();
return new Promise(resolve => { return new Promise(resolve => {
connection.on('end', resolve); connection.on('end', resolve);
connection.close(); connection.close();
...@@ -163,14 +158,7 @@ class ConnectionManager extends AbstractConnectionManager { ...@@ -163,14 +158,7 @@ class ConnectionManager extends AbstractConnectionManager {
}); });
} }
validate(connectionLock) { validate(connection) {
/**
* Abstract connection may try to validate raw connection used for fetching version
*/
const connection = connectionLock.unwrap
? connectionLock.unwrap()
: connectionLock;
return connection && connection.loggedIn; return connection && connection.loggedIn;
} }
} }
......
...@@ -114,7 +114,7 @@ class Query extends AbstractQuery { ...@@ -114,7 +114,7 @@ class Query extends AbstractQuery {
} }
run(sql, parameters) { run(sql, parameters) {
return Promise.using(this.connection.lock(), connection => this._run(connection, sql, parameters)); return this.connection.queue.enqueue(() => this._run(this.connection, sql, parameters));
} }
static formatBindParameters(sql, values, dialect) { static formatBindParameters(sql, values, dialect) {
......
'use strict';
const Promise = require('../../promise');
class ResourceLock {
constructor(resource) {
this.resource = resource;
this.previous = Promise.resolve(resource);
}
unwrap() {
return this.resource;
}
lock() {
const lock = this.previous;
let resolve;
this.previous = new Promise(r => {
resolve = r;
});
return lock.disposer(resolve);
}
}
module.exports = ResourceLock;
...@@ -11928,15 +11928,6 @@ ...@@ -11928,15 +11928,6 @@
"integrity": "sha512-Y3W0wlRPK8ZMRbNq97l4M5otioeA5lm1z7bkNkxCka8HSPjR0xRWmpCmc9utiaLP9Jb1eD8BgeIxTW4AIF45Pg==", "integrity": "sha512-Y3W0wlRPK8ZMRbNq97l4M5otioeA5lm1z7bkNkxCka8HSPjR0xRWmpCmc9utiaLP9Jb1eD8BgeIxTW4AIF45Pg==",
"dev": true "dev": true
}, },
"p-limit": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-1.3.0.tgz",
"integrity": "sha512-vvcXsLAJ9Dr5rQOPk7toZQZJApBl2K4J6dANSsEuh6QI41JYcsS/qhTGa9ErIUUgK3WNQoJYvylxvjqmiqEA9Q==",
"dev": true,
"requires": {
"p-try": "^1.0.0"
}
},
"p-locate": { "p-locate": {
"version": "2.0.0", "version": "2.0.0",
"resolved": "https://registry.npmjs.org/p-locate/-/p-locate-2.0.0.tgz", "resolved": "https://registry.npmjs.org/p-locate/-/p-locate-2.0.0.tgz",
...@@ -11944,6 +11935,17 @@ ...@@ -11944,6 +11935,17 @@
"dev": true, "dev": true,
"requires": { "requires": {
"p-limit": "^1.1.0" "p-limit": "^1.1.0"
},
"dependencies": {
"p-limit": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-1.3.0.tgz",
"integrity": "sha512-vvcXsLAJ9Dr5rQOPk7toZQZJApBl2K4J6dANSsEuh6QI41JYcsS/qhTGa9ErIUUgK3WNQoJYvylxvjqmiqEA9Q==",
"dev": true,
"requires": {
"p-try": "^1.0.0"
}
}
} }
}, },
"p-map": { "p-map": {
......
...@@ -5,6 +5,9 @@ const chai = require('chai'), ...@@ -5,6 +5,9 @@ const chai = require('chai'),
Promise = require('../../../../lib/promise'), Promise = require('../../../../lib/promise'),
DataTypes = require('../../../../lib/data-types'), DataTypes = require('../../../../lib/data-types'),
Support = require('../../support'), Support = require('../../support'),
Sequelize = require('../../../../lib/sequelize'),
ConnectionError = require('../../../../lib/errors/connection-error'),
{ AsyncQueueError } = require('../../../../lib/dialects/mssql/async-queue'),
dialect = Support.getTestDialect(); dialect = Support.getTestDialect();
if (dialect.match(/^mssql/)) { if (dialect.match(/^mssql/)) {
...@@ -33,5 +36,93 @@ if (dialect.match(/^mssql/)) { ...@@ -33,5 +36,93 @@ if (dialect.match(/^mssql/)) {
]); ]);
})).not.to.be.rejected; })).not.to.be.rejected;
}); });
it('requests that reject should not affect future requests', async function() {
const User = this.User;
await expect(this.sequelize.transaction(async t => {
await expect(User.create({
username: new Date()
})).to.be.rejected;
await expect(User.findOne({
transaction: t
})).not.to.be.rejected;
})).not.to.be.rejected;
});
it('closing the connection should reject pending requests', async function() {
const User = this.User;
let promise;
await expect(this.sequelize.transaction(t =>
promise = Promise.all([
expect(this.sequelize.dialect.connectionManager.disconnect(t.connection)).to.be.fulfilled,
expect(User.findOne({
transaction: t
})).to.be.eventually.rejectedWith(ConnectionError, 'the connection was closed before this query could be executed')
.and.have.property('parent').that.instanceOf(AsyncQueueError),
expect(User.findOne({
transaction: t
})).to.be.eventually.rejectedWith(ConnectionError, 'the connection was closed before this query could be executed')
.and.have.property('parent').that.instanceOf(AsyncQueueError)
])
)).to.be.rejectedWith(ConnectionError, 'the connection was closed before this query could be executed');
await expect(promise).not.to.be.rejected;
});
it('closing the connection should reject in-progress requests', async function() {
const User = this.User;
let promise;
await expect(this.sequelize.transaction(async t => {
const wrappedExecSql = t.connection.execSql;
t.connection.execSql = (...args) => {
this.sequelize.dialect.connectionManager.disconnect(t.connection);
return wrappedExecSql(...args);
};
return promise = expect(User.findOne({
transaction: t
})).to.be.eventually.rejectedWith(ConnectionError, 'the connection was closed before this query could finish executing')
.and.have.property('parent').that.instanceOf(AsyncQueueError);
})).to.be.eventually.rejectedWith(ConnectionError, 'the connection was closed before this query could be executed')
.and.have.property('parent').that.instanceOf(AsyncQueueError);
await expect(promise).not.to.be.rejected;
});
describe('unhandled rejections', () => {
let onUnhandledRejection;
afterEach(() => {
process.removeListener('unhandledRejection', onUnhandledRejection);
});
it("unhandled rejection should occur if user doesn't catch promise returned from query", async function() {
const User = this.User;
const rejectionPromise = new Promise((resolve, reject) => {
onUnhandledRejection = reject;
});
process.on('unhandledRejection', onUnhandledRejection);
User.create({
username: new Date()
});
await expect(rejectionPromise).to.be.rejectedWith(
Sequelize.ValidationError, 'string violation: username cannot be an array or an object');
});
it('no unhandled rejections should occur as long as user catches promise returned from query', async function() {
const User = this.User;
const unhandledRejections = [];
onUnhandledRejection = error => unhandledRejections.push(error);
process.on('unhandledRejection', onUnhandledRejection);
await expect(User.create({
username: new Date()
})).to.be.rejectedWith(Sequelize.ValidationError);
expect(unhandledRejections).to.deep.equal([]);
});
});
}); });
} }
...@@ -20,7 +20,7 @@ function assertSameConnection(newConnection, oldConnection) { ...@@ -20,7 +20,7 @@ function assertSameConnection(newConnection, oldConnection) {
break; break;
case 'mssql': case 'mssql':
expect(newConnection.unwrap().dummyId).to.equal(oldConnection.unwrap().dummyId).and.to.be.ok; expect(newConnection.dummyId).to.equal(oldConnection.dummyId).and.to.be.ok;
break; break;
default: default:
...@@ -40,8 +40,8 @@ function assertNewConnection(newConnection, oldConnection) { ...@@ -40,8 +40,8 @@ function assertNewConnection(newConnection, oldConnection) {
break; break;
case 'mssql': case 'mssql':
expect(newConnection.unwrap().dummyId).to.not.be.ok; expect(newConnection.dummyId).to.not.be.ok;
expect(oldConnection.unwrap().dummyId).to.be.ok; expect(oldConnection.dummyId).to.be.ok;
break; break;
default: default:
...@@ -49,9 +49,8 @@ function assertNewConnection(newConnection, oldConnection) { ...@@ -49,9 +49,8 @@ function assertNewConnection(newConnection, oldConnection) {
} }
} }
function unwrapAndAttachMSSQLUniqueId(connection) { function attachMSSQLUniqueId(connection) {
if (dialect === 'mssql') { if (dialect === 'mssql') {
connection = connection.unwrap();
connection.dummyId = Math.random(); connection.dummyId = Math.random();
} }
...@@ -74,7 +73,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => { ...@@ -74,7 +73,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
function simulateUnexpectedError(connection) { function simulateUnexpectedError(connection) {
// should never be returned again // should never be returned again
if (dialect === 'mssql') { if (dialect === 'mssql') {
connection = unwrapAndAttachMSSQLUniqueId(connection); connection = attachMSSQLUniqueId(connection);
} }
connection.emit('error', { code: 'ECONNRESET' }); connection.emit('error', { code: 'ECONNRESET' });
} }
...@@ -100,7 +99,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => { ...@@ -100,7 +99,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
function simulateUnexpectedError(connection) { function simulateUnexpectedError(connection) {
// should never be returned again // should never be returned again
if (dialect === 'mssql') { if (dialect === 'mssql') {
unwrapAndAttachMSSQLUniqueId(connection).close(); attachMSSQLUniqueId(connection).close();
} else if (dialect === 'postgres') { } else if (dialect === 'postgres') {
connection.end(); connection.end();
} else { } else {
...@@ -138,7 +137,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => { ...@@ -138,7 +137,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
const firstConnection = await cm.getConnection(); const firstConnection = await cm.getConnection();
// TODO - Do we really need this call? // TODO - Do we really need this call?
unwrapAndAttachMSSQLUniqueId(firstConnection); attachMSSQLUniqueId(firstConnection);
// returning connection back to pool // returning connection back to pool
await cm.releaseConnection(firstConnection); await cm.releaseConnection(firstConnection);
...@@ -163,7 +162,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => { ...@@ -163,7 +162,7 @@ describe(Support.getTestDialectTeaser('Pooling'), () => {
const firstConnection = await cm.getConnection(); const firstConnection = await cm.getConnection();
// TODO - Do we really need this call? // TODO - Do we really need this call?
unwrapAndAttachMSSQLUniqueId(firstConnection); attachMSSQLUniqueId(firstConnection);
// returning connection back to pool // returning connection back to pool
await cm.releaseConnection(firstConnection); await cm.releaseConnection(firstConnection);
......
'use strict';
const ResourceLock = require('../../../../lib/dialects/mssql/resource-lock'),
Promise = require('../../../../lib/promise'),
assert = require('assert'),
Support = require('../../support'),
dialect = Support.getTestDialect(),
delay = require('delay');
if (dialect === 'mssql') {
describe('[MSSQL Specific] ResourceLock', () => {
it('should process requests serially', () => {
const expected = {};
const lock = new ResourceLock(expected);
let last = 0;
function validateResource(actual) {
assert.equal(actual, expected);
}
return Promise.all([
Promise.using(lock.lock(), resource => {
validateResource(resource);
assert.equal(last, 0);
last = 1;
return delay(15);
}),
Promise.using(lock.lock(), resource => {
validateResource(resource);
assert.equal(last, 1);
last = 2;
}),
Promise.using(lock.lock(), resource => {
validateResource(resource);
assert.equal(last, 2);
last = 3;
return delay(5);
})
]);
});
it('should still return resource after failure', () => {
const expected = {};
const lock = new ResourceLock(expected);
function validateResource(actual) {
assert.equal(actual, expected);
}
return Promise.all([
Promise.using(lock.lock(), resource => {
validateResource(resource);
throw new Error('unexpected error');
}).catch(() => {}),
Promise.using(lock.lock(), validateResource)
]);
});
it('should be able to.lock resource without waiting on lock', () => {
const expected = {};
const lock = new ResourceLock(expected);
assert.equal(lock.unwrap(), expected);
});
});
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!