不要怂,就是干,撸起袖子干!

Commit 83ed07c7 by K.J. Valencik Committed by Jan Aagaard Meier

Add query queueing to MSSQL connections (#5752)

1 parent 1746048f
# Future # Future
- [FIXED] Fix defaultValues getting overwritten on build - [FIXED] Fix defaultValues getting overwritten on build
- [FIXED] Queue queries against tedious connections
# 3.21.0 # 3.21.0
- [FIXED] Confirmed that values modified in validation hooks are preserved [#3534](https://github.com/sequelize/sequelize/issues/3534) - [FIXED] Confirmed that values modified in validation hooks are preserved [#3534](https://github.com/sequelize/sequelize/issues/3534)
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
var AbstractConnectionManager = require('../abstract/connection-manager') var AbstractConnectionManager = require('../abstract/connection-manager')
, ConnectionManager , ConnectionManager
, ResourceLock = require('./resource-lock')
, Utils = require('../../utils') , Utils = require('../../utils')
, Promise = require('../../promise') , Promise = require('../../promise')
, sequelizeErrors = require('../../errors') , sequelizeErrors = require('../../errors')
...@@ -66,7 +67,7 @@ ConnectionManager.prototype.connect = function(config) { ...@@ -66,7 +67,7 @@ ConnectionManager.prototype.connect = function(config) {
connection.on('connect', function(err) { connection.on('connect', function(err) {
if (!err) { if (!err) {
resolve(connection); resolve(new ResourceLock(connection));
return; return;
} }
...@@ -119,7 +120,9 @@ ConnectionManager.prototype.connect = function(config) { ...@@ -119,7 +120,9 @@ ConnectionManager.prototype.connect = function(config) {
}); });
}; };
ConnectionManager.prototype.disconnect = function(connection) { ConnectionManager.prototype.disconnect = function(connectionLock) {
var connection = connectionLock.unwrap();
// Dont disconnect a connection that is already disconnected // Dont disconnect a connection that is already disconnected
if (!!connection.closed) { if (!!connection.closed) {
return Promise.resolve(); return Promise.resolve();
...@@ -131,7 +134,9 @@ ConnectionManager.prototype.disconnect = function(connection) { ...@@ -131,7 +134,9 @@ ConnectionManager.prototype.disconnect = function(connection) {
}); });
}; };
ConnectionManager.prototype.validate = function(connection) { ConnectionManager.prototype.validate = function(connectionLock) {
var connection = connectionLock.unwrap();
return connection && connection.loggedIn; return connection && connection.loggedIn;
}; };
......
'use strict'; 'use strict';
var Utils = require('../../utils') var Utils = require('../../utils')
, Promise = require('../../promise')
, AbstractQuery = require('../abstract/query') , AbstractQuery = require('../abstract/query')
, sequelizeErrors = require('../../errors.js') , sequelizeErrors = require('../../errors.js')
, parserStore = require('../parserStore')('mssql'), , parserStore = require('../parserStore')('mssql'),
...@@ -27,7 +28,7 @@ Query.prototype.getInsertIdField = function() { ...@@ -27,7 +28,7 @@ Query.prototype.getInsertIdField = function() {
}; };
Query.formatBindParameters = AbstractQuery.formatBindParameters; Query.formatBindParameters = AbstractQuery.formatBindParameters;
Query.prototype.run = function(sql, parameters) { Query.prototype._run = function(connection, sql, parameters) {
var self = this; var self = this;
this.sql = sql; this.sql = sql;
...@@ -37,13 +38,13 @@ Query.prototype.run = function(sql, parameters) { ...@@ -37,13 +38,13 @@ Query.prototype.run = function(sql, parameters) {
if (benchmark) { if (benchmark) {
var queryBegin = Date.now(); var queryBegin = Date.now();
} else { } else {
this.sequelize.log('Executing (' + (this.connection.uuid || 'default') + '): ' + this.sql, this.options); this.sequelize.log('Executing (' + (connection.uuid || 'default') + '): ' + this.sql, this.options);
} }
var promise = new Utils.Promise(function(resolve, reject) { var promise = new Utils.Promise(function(resolve, reject) {
// TRANSACTION SUPPORT // TRANSACTION SUPPORT
if (_.includes(self.sql, 'BEGIN TRANSACTION')) { if (_.includes(self.sql, 'BEGIN TRANSACTION')) {
self.connection.beginTransaction(function(err) { connection.beginTransaction(function(err) {
if (!!err) { if (!!err) {
reject(self.formatError(err)); reject(self.formatError(err));
} else { } else {
...@@ -51,7 +52,7 @@ Query.prototype.run = function(sql, parameters) { ...@@ -51,7 +52,7 @@ Query.prototype.run = function(sql, parameters) {
} }
} /* name, isolation_level */); } /* name, isolation_level */);
} else if (_.includes(self.sql, 'COMMIT TRANSACTION')) { } else if (_.includes(self.sql, 'COMMIT TRANSACTION')) {
self.connection.commitTransaction(function(err) { connection.commitTransaction(function(err) {
if (!!err) { if (!!err) {
reject(self.formatError(err)); reject(self.formatError(err));
} else { } else {
...@@ -59,7 +60,7 @@ Query.prototype.run = function(sql, parameters) { ...@@ -59,7 +60,7 @@ Query.prototype.run = function(sql, parameters) {
} }
}); });
} else if (_.includes(self.sql, 'ROLLBACK TRANSACTION')) { } else if (_.includes(self.sql, 'ROLLBACK TRANSACTION')) {
self.connection.rollbackTransaction(function(err) { connection.rollbackTransaction(function(err) {
if (!!err) { if (!!err) {
reject(self.formatError(err)); reject(self.formatError(err));
} else { } else {
...@@ -70,10 +71,10 @@ Query.prototype.run = function(sql, parameters) { ...@@ -70,10 +71,10 @@ Query.prototype.run = function(sql, parameters) {
// QUERY SUPPORT // QUERY SUPPORT
var results = []; var results = [];
var request = new self.connection.lib.Request(self.sql, function(err) { var request = new connection.lib.Request(self.sql, function(err) {
if (benchmark) { if (benchmark) {
self.sequelize.log('Executed (' + (self.connection.uuid || 'default') + '): ' + self.sql, (Date.now() - queryBegin), self.options); self.sequelize.log('Executed (' + (connection.uuid || 'default') + '): ' + self.sql, (Date.now() - queryBegin), self.options);
} }
if (err) { if (err) {
...@@ -100,13 +101,21 @@ Query.prototype.run = function(sql, parameters) { ...@@ -100,13 +101,21 @@ Query.prototype.run = function(sql, parameters) {
results.push(row); results.push(row);
}); });
self.connection.execSql(request); connection.execSql(request);
} }
}); });
return promise; return promise;
}; };
Query.prototype.run = function(sql, parameters) {
var self = this;
return Promise.using(this.connection.lock(), function (connection) {
return self._run(connection, sql, parameters);
});
};
/** /**
* High level function that handles the results of a query execution. * High level function that handles the results of a query execution.
* *
......
'use strict';
var Promise = require('../../promise');
function ResourceLock(resource) {
this.resource = resource;
this.previous = Promise.resolve(resource);
}
ResourceLock.prototype.unwrap = function () {
return this.resource;
};
ResourceLock.prototype.lock = function () {
var lock = this.previous;
var resolve;
this.previous = new Promise(function (r) {
resolve = r;
});
return lock.disposer(resolve);
};
module.exports = ResourceLock;
'use strict';
var chai = require('chai')
, expect = chai.expect
, Promise = require('../../../../lib/promise')
, DataTypes = require('../../../../lib/data-types')
, Support = require('../../support')
, dialect = Support.getTestDialect();
if (dialect.match(/^mssql/)) {
describe('[MSSQL Specific] Query Queue', function () {
beforeEach(function () {
var User = this.User = this.sequelize.define('User', {
username: DataTypes.STRING
});
return this.sequelize.sync({ force: true }).then(function () {
return User.create({ username: 'John'});
});
});
it('should queue concurrent requests to a connection', function() {
var User = this.User;
return expect(this.sequelize.transaction(function (t) {
return Promise.all([
User.findOne({
transaction: t
}),
User.findOne({
transaction: t
})
]);
})).not.to.be.rejected;
});
});
}
'use strict';
var ResourceLock = require('../../../../lib/dialects/mssql/resource-lock')
, Promise = require('../../../../lib/promise')
, assert = require('assert');
describe('[MSSQL Specific] ResourceLock', function () {
it('should process requests serially', function () {
var expected = {};
var lock = new ResourceLock(expected);
var last = 0;
function validateResource(actual) {
assert.equal(actual, expected);
}
return Promise.all([
Promise.using(lock.lock(), function (resource) {
validateResource(resource);
assert.equal(last, 0);
last = 1;
return Promise.delay(15);
}),
Promise.using(lock.lock(), function (resource) {
validateResource(resource);
assert.equal(last, 1);
last = 2;
}),
Promise.using(lock.lock(), function (resource) {
validateResource(resource);
assert.equal(last, 2);
last = 3;
return Promise.delay(5);
})
]);
});
it('should still return resource after failure', function () {
var expected = {};
var lock = new ResourceLock(expected);
function validateResource(actual) {
assert.equal(actual, expected);
}
return Promise.all([
Promise.using(lock.lock(), function (resource) {
validateResource(resource);
throw new Error('unexpected error');
}).catch(function () {}),
Promise.using(lock.lock(), validateResource)
]);
});
it('should be able to.lock resource without waiting on lock', function () {
var expected = {};
var lock = new ResourceLock(expected);
assert.equal(lock.unwrap(), expected);
});
});
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!