不要怂,就是干,撸起袖子干!

Commit 4294b4c8 by Mick Hansen

Merge pull request #2810 from janmeier/cls

CLS support in transactions
2 parents 64d13eaf f4b3517d
# Next # Next
- [FEATURE] CLS Support. CLS is also used to automatically pass the transaction to any calls within the callback chain when using `sequelize.transaction(function() ...`.
- [BUG] Fixed issue with paranoid deletes and `deletedAt` with a custom field. - [BUG] Fixed issue with paranoid deletes and `deletedAt` with a custom field.
- [BUG] No longer crahes on `where: []` - [BUG] No longer crahes on `where: []`
- [FEATURE] Validations are now enabled by default for upsert. - [FEATURE] Validations are now enabled by default for upsert.
......
## Transactions Sequelize supports two ways of using transactions:
Sequelize supports two ways of using transactions, one will automatically commit or rollback the transaction based on a promise chain and the other leaves it up to the user. * One which will automatically commit or rollback the transaction based on the result of a promise chain and, (if enabled) pass the transaction to all calls within the callback
* And one which leaves committing, rolling back and passing the transaction to the user.
The key difference is that the managed transaction uses a callback that expects a promise to be returned to it while the unmanaged transaction returns a promise. The key difference is that the managed transaction uses a callback that expects a promise to be returned to it while the unmanaged transaction returns a promise.
### Auto commit/rollback # Managed transaction (auto-callback)
```js ```js
return sequelize.transaction(function (t) { return sequelize.transaction(function (t) {
return User.create({ return User.create({
...@@ -14,18 +15,69 @@ return sequelize.transaction(function (t) { ...@@ -14,18 +15,69 @@ return sequelize.transaction(function (t) {
return user.setShooter({ return user.setShooter({
firstName: 'John', firstName: 'John',
lastName: 'Boothe' lastName: 'Boothe'
}, {transction: t}); }, {transaction: t});
}); });
}).then(function (result) { }).then(function (result) {
// Transaction has been committed // Transaction has been committed
// result is whatever the result of the promise chain returned to the transaction callback is // result is whatever the result of the promise chain returned to the transaction callback
}).catch(function (err) { }).catch(function (err) {
// Transaction has been rolled back // Transaction has been rolled back
// err is whatever rejected the promise chain returned to the transaction callback is // err is whatever rejected the promise chain returned to the transaction callback
});
```
In the example above, the transaction is still manually passed, by passing `{ transaction: t }` as the second argument. To automatically pass the transaction to all queries you must install the [continuation local storage](https://github.com/othiym23/node-continuation-local-storage) (CLS) module and instantiate a namespace in your own code:
```js
var cls = require('continuation-local-storage'),
namespace = cls.createNamespace('my-very-own-namespace');
```
To enable CLS you must tell sequelize which namespace to use by setting it as a property on the sequelize constructor:
```js
var Sequelize = require('sequelize');
Sequelize.cls = namespace;
new Sequelize(....);
```
Notice, that the `cls` property must be set on the *constructor*, not on an instance of sequelize. This means that all instances will share the same namespace, and that CLS is all-or-nothing - you cannot enable it only for some instances.
CLS works like a thread-local storage for callbacks. What this means in practice is, that different callback chains can access local variables by using the CLS namespace. When CLS is enabled sequelize will set the `transaction` property on the namespace when a new transaction is created. Since variables set within a callback chain are private to that chain several concurrent transactions can exist at the same time:
```js
sequelize.transaction(function (t1) {
namespace.get('transaction') === t1;
});
sequelize.transaction(function (t2) {
namespace.get('transaction') === t2;
});
```
In most case you won't need to access `namespace.get('transaction')` directly, since all queries will automatically look for a transaction on the namespace:
```js
sequelize.transaction(function (t1) {
// With CLS enabled, the user will be created inside the transaction
User.create({ name: 'Alice' });
});
```
If you want to execute queries inside the callback without using the transaction you can pass `{ transaction: null }`, or another transaction if you have several concurrent ones:
```js
sequelize.transaction(function (t1) {
sequelize.transaction(function (t2) {
// By default queries here will use t2
User.create({ name: 'Bob' }, { transaction: null });
User.create({ name: 'Mallory' }, { transaction: t1 });
});
}); });
``` ```
### Handled manually # Unmanaged transaction (then-callback)
```js ```js
return sequelize.transaction().then(function (t) { return sequelize.transaction().then(function (t) {
return User.create({ return User.create({
...@@ -44,8 +96,8 @@ return sequelize.transaction().then(function (t) { ...@@ -44,8 +96,8 @@ return sequelize.transaction().then(function (t) {
}); });
``` ```
### Using transactions with other sequelize methods # Using transactions with other sequelize methods
The `transaction` option goes with most other options, which are usually the first argument of a method. The `transaction` option goes with most other options, which are usually the first argument of a method.
For methods that take values, like `.create`, `.update()`, `.updateAttributes()` and more `transaction` should be passed to the option in the second argument. For methods that take values, like `.create`, `.update()`, `.updateAttributes()` etc. `transaction` should be passed to the option in the second argument.
If unsure, refer to the api documentation for the method you are using to be sure of the signature. If unsure, refer to the API documentation for the method you are using to be sure of the signature.
...@@ -185,7 +185,6 @@ ConnectionManager.prototype.initPools = function () { ...@@ -185,7 +185,6 @@ ConnectionManager.prototype.initPools = function () {
idleTimeoutMillis: config.pool.idle idleTimeoutMillis: config.pool.idle
}); });
} }
}; };
ConnectionManager.prototype.getConnection = function(options) { ConnectionManager.prototype.getConnection = function(options) {
......
...@@ -40,7 +40,6 @@ module.exports = (function() { ...@@ -40,7 +40,6 @@ module.exports = (function() {
resolve(self.formatResults(results)); resolve(self.formatResults(results));
} }
}).setMaxListeners(100); }).setMaxListeners(100);
}); });
return promise; return promise;
......
...@@ -536,7 +536,7 @@ module.exports = (function() { ...@@ -536,7 +536,7 @@ module.exports = (function() {
}); });
options.fields = _.unique(options.fields.concat(hookChanged)); options.fields = _.unique(options.fields.concat(hookChanged));
} }
if (hookChanged) { if (hookChanged) {
return Promise.bind(this).then(function() { return Promise.bind(this).then(function() {
...@@ -815,7 +815,7 @@ module.exports = (function() { ...@@ -815,7 +815,7 @@ module.exports = (function() {
* @param {String|Array|Object} fields If a string is provided, that column is incremented by the value of `by` given in options. If an array is provided, the same is true for each column. If and object is provided, each column is incremented by the value given * @param {String|Array|Object} fields If a string is provided, that column is incremented by the value of `by` given in options. If an array is provided, the same is true for each column. If and object is provided, each column is incremented by the value given
* @param {Object} [options] * @param {Object} [options]
* @param {Integer} [options.by=1] The number to increment by * @param {Integer} [options.by=1] The number to increment by
* @param {Transaction} [options.transaction=null] * @param {Transaction} [options.transaction]
* *
* @return {Promise} * @return {Promise}
*/ */
...@@ -832,9 +832,9 @@ module.exports = (function() { ...@@ -832,9 +832,9 @@ module.exports = (function() {
, where; , where;
if (countOrOptions === undefined) { if (countOrOptions === undefined) {
countOrOptions = { by: 1, transaction: null }; countOrOptions = { by: 1 };
} else if (typeof countOrOptions === 'number') { } else if (typeof countOrOptions === 'number') {
countOrOptions = { by: countOrOptions, transaction: null }; countOrOptions = { by: countOrOptions };
} }
countOrOptions = Utils._.extend({ countOrOptions = Utils._.extend({
...@@ -880,7 +880,7 @@ module.exports = (function() { ...@@ -880,7 +880,7 @@ module.exports = (function() {
* @param {String|Array|Object} fields If a string is provided, that column is decremented by the value of `by` given in options. If an array is provided, the same is true for each column. If and object is provided, each column is decremented by the value given * @param {String|Array|Object} fields If a string is provided, that column is decremented by the value of `by` given in options. If an array is provided, the same is true for each column. If and object is provided, each column is decremented by the value given
* @param {Object} [options] * @param {Object} [options]
* @param {Integer} [options.by=1] The number to decrement by * @param {Integer} [options.by=1] The number to decrement by
* @param {Transaction} [options.transaction=null] * @param {Transaction} [options.transaction]
* *
* @return {Promise} * @return {Promise}
*/ */
...@@ -892,9 +892,9 @@ module.exports = (function() { ...@@ -892,9 +892,9 @@ module.exports = (function() {
}); });
if (countOrOptions === undefined) { if (countOrOptions === undefined) {
countOrOptions = { by: 1, transaction: null }; countOrOptions = { by: 1 };
} else if (typeof countOrOptions === 'number') { } else if (typeof countOrOptions === 'number') {
countOrOptions = { by: countOrOptions, transaction: null }; countOrOptions = { by: countOrOptions };
} }
if (countOrOptions.by === undefined) { if (countOrOptions.by === undefined) {
......
...@@ -677,7 +677,6 @@ module.exports = (function() { ...@@ -677,7 +677,6 @@ module.exports = (function() {
, tableNames = { }; , tableNames = { };
tableNames[this.getTableName(options)] = true; tableNames[this.getTableName(options)] = true;
options = optClone(options || {}); options = optClone(options || {});
options = Utils._.defaults(options, { options = Utils._.defaults(options, {
hooks: true hooks: true
...@@ -1046,9 +1045,7 @@ module.exports = (function() { ...@@ -1046,9 +1045,7 @@ module.exports = (function() {
options = { fields: options }; options = { fields: options };
} }
options = Utils._.extend({ options = options || {};
transaction: null
}, options || {});
return this.build(values, { return this.build(values, {
isNewRecord: true, isNewRecord: true,
......
...@@ -39,6 +39,9 @@ var SequelizePromise = function (resolver) { ...@@ -39,6 +39,9 @@ var SequelizePromise = function (resolver) {
return promise; return promise;
}; };
var util = require('util');
util.inherits(SequelizePromise, Promise);
for (var method in Promise) { for (var method in Promise) {
if (Promise.hasOwnProperty(method)) { if (Promise.hasOwnProperty(method)) {
SequelizePromise[method] = Promise[method]; SequelizePromise[method] = Promise[method];
...@@ -47,6 +50,13 @@ for (var method in Promise) { ...@@ -47,6 +50,13 @@ for (var method in Promise) {
var bluebirdThen = Promise.prototype._then; var bluebirdThen = Promise.prototype._then;
Promise.prototype._then = function (didFulfill, didReject, didProgress, receiver, internalData) { Promise.prototype._then = function (didFulfill, didReject, didProgress, receiver, internalData) {
if (SequelizePromise.Sequelize.cls) {
var ns = SequelizePromise.Sequelize.cls;
if (typeof didFulfill === 'function') didFulfill = ns.bind(didFulfill);
if (typeof didReject === 'function') didReject = ns.bind(didReject);
if (typeof didProgress === 'function') didProgress = ns.bind(didProgress);
}
var ret = bluebirdThen.call(this, didFulfill, didReject, didProgress, receiver, internalData); var ret = bluebirdThen.call(this, didFulfill, didReject, didProgress, receiver, internalData);
// Needed to transfer sql events accross .then() calls // Needed to transfer sql events accross .then() calls
...@@ -60,7 +70,6 @@ Promise.prototype._then = function (didFulfill, didReject, didProgress, receiver ...@@ -60,7 +70,6 @@ Promise.prototype._then = function (didFulfill, didReject, didProgress, receiver
return ret; return ret;
}; };
var bluebirdSettle = Promise.prototype._settlePromiseAt; var bluebirdSettle = Promise.prototype._settlePromiseAt;
Promise.prototype._settlePromiseAt = function (index) { Promise.prototype._settlePromiseAt = function (index) {
bluebirdSettle.call(this, index); bluebirdSettle.call(this, index);
......
...@@ -46,7 +46,6 @@ module.exports = (function() { ...@@ -46,7 +46,6 @@ module.exports = (function() {
var self = this; var self = this;
options = Utils._.extend({ options = Utils._.extend({
transaction: null,
raw: true raw: true
}, options || {}); }, options || {});
...@@ -290,7 +289,6 @@ module.exports = (function() { ...@@ -290,7 +289,6 @@ module.exports = (function() {
QueryInterface.prototype.showAllTables = function(options) { QueryInterface.prototype.showAllTables = function(options) {
var self = this; var self = this;
options = Utils._.extend({ options = Utils._.extend({
transaction: null,
raw: true, raw: true,
type: QueryTypes.SHOWTABLES type: QueryTypes.SHOWTABLES
}, options || {}); }, options || {});
......
...@@ -631,7 +631,7 @@ module.exports = (function() { ...@@ -631,7 +631,7 @@ module.exports = (function() {
* @param {Instance} [callee] If callee is provided, the returned data will be put into the callee * @param {Instance} [callee] If callee is provided, the returned data will be put into the callee
* @param {Object} [options={}] Query options. * @param {Object} [options={}] Query options.
* @param {Boolean} [options.raw] If true, sequelize will not try to format the results of the query, or build an instance of a model from the result * @param {Boolean} [options.raw] If true, sequelize will not try to format the results of the query, or build an instance of a model from the result
* @param {Transaction} [options.transaction=null] The transaction that the query should be executed under * @param {Transaction} [options.transaction] The transaction that the query should be executed under
* @param {String} [options.type='SELECT'] The type of query you are executing. The query type affects how results are formatted before they are passed back. If no type is provided sequelize will try to guess the right type based on the sql, and fall back to SELECT. The type is a string, but `Sequelize.QueryTypes` is provided is convenience shortcuts. Current options are SELECT, BULKUPDATE and BULKDELETE * @param {String} [options.type='SELECT'] The type of query you are executing. The query type affects how results are formatted before they are passed back. If no type is provided sequelize will try to guess the right type based on the sql, and fall back to SELECT. The type is a string, but `Sequelize.QueryTypes` is provided is convenience shortcuts. Current options are SELECT, BULKUPDATE and BULKDELETE
* @param {Boolean} [options.nest=false] If true, transforms objects with `.` separated property names into nested objects using [dottie.js](https://github.com/mickhansen/dottie.js). For example { 'user.username': 'john' } becomes { user: { username: 'john' }} * @param {Boolean} [options.nest=false] If true, transforms objects with `.` separated property names into nested objects using [dottie.js](https://github.com/mickhansen/dottie.js). For example { 'user.username': 'john' } becomes { user: { username: 'john' }}
* @param {Object|Array} [replacements] Either an object of named parameter replacements in the format `:param` or an array of unnamed replacements to replace `?` in your SQL. * @param {Object|Array} [replacements] Either an object of named parameter replacements in the format `:param` or an array of unnamed replacements to replace `?` in your SQL.
...@@ -663,6 +663,10 @@ module.exports = (function() { ...@@ -663,6 +663,10 @@ module.exports = (function() {
type: (sql.toLowerCase().indexOf('select') === 0) ? QueryTypes.SELECT : false type: (sql.toLowerCase().indexOf('select') === 0) ? QueryTypes.SELECT : false
}); });
if (options.transaction === undefined && Sequelize.cls) {
options.transaction = Sequelize.cls.get('transaction');
}
if (options.transaction && options.transaction.finished) { if (options.transaction && options.transaction.finished) {
return Promise.reject(options.transaction.finished+' has been called on this transaction, you can no longer use it'); return Promise.reject(options.transaction.finished+' has been called on this transaction, you can no longer use it');
} }
...@@ -796,7 +800,6 @@ module.exports = (function() { ...@@ -796,7 +800,6 @@ module.exports = (function() {
return Promise.reject('Database does not match sync match parameter'); return Promise.reject('Database does not match sync match parameter');
} }
} }
var when; var when;
if (options.force) { if (options.force) {
when = this.drop(options); when = this.drop(options);
...@@ -1020,6 +1023,17 @@ module.exports = (function() { ...@@ -1020,6 +1023,17 @@ module.exports = (function() {
* }); * });
* ``` * ```
* *
* If you have [CLS](https://github.com/othiym23/node-continuation-local-storage) enabled, the transaction will automatically be passed to any query that runs witin the callback.
* To enable CLS, add it do your project, create a namespace and set it on the sequelize constructor:
*
* ```js
* var cls = require('continuation-local-storage'),
* ns = cls.createNamespace('....');
* var Sequelize = require('sequelize');
* Sequelize.cls = ns;
* ```
* Note, that CLS is enabled for all sequelize instances, and all instances will share the same namespace
*
* @see {Transaction} * @see {Transaction}
* @param {Object} [options={}] * @param {Object} [options={}]
...@@ -1035,12 +1049,18 @@ module.exports = (function() { ...@@ -1035,12 +1049,18 @@ module.exports = (function() {
options = undefined; options = undefined;
} }
var transaction = new Transaction(this, options); var transaction = new Transaction(this, options)
, ns = Sequelize.cls;
if (autoCallback) { if (autoCallback) {
deprecated('Note: When passing a callback to a transaction a promise chain is expected in return, the transaction will be committed or rejected based on the promise chain returned to the callback.'); deprecated('Note: When passing a callback to a transaction a promise chain is expected in return, the transaction will be committed or rejected based on the promise chain returned to the callback.');
return new Promise(function (resolve, reject) {
var transactionResolver = function (resolve, reject) {
transaction.prepareEnvironment().then(function () { transaction.prepareEnvironment().then(function () {
if (ns) {
autoCallback = ns.bind(autoCallback);
}
var result = autoCallback(transaction); var result = autoCallback(transaction);
if (!result) return reject(new Error('You need to return a promise chain to the sequelize.transaction() callback')); if (!result) return reject(new Error('You need to return a promise chain to the sequelize.transaction() callback'));
...@@ -1056,7 +1076,13 @@ module.exports = (function() { ...@@ -1056,7 +1076,13 @@ module.exports = (function() {
}); });
}); });
}).catch(reject); }).catch(reject);
}); };
if (ns) {
transactionResolver = ns.bind(transactionResolver, ns.createContext());
}
return new Promise(transactionResolver);
} else { } else {
return transaction.prepareEnvironment().return(transaction); return transaction.prepareEnvironment().return(transaction);
} }
...@@ -1096,5 +1122,7 @@ module.exports = (function() { ...@@ -1096,5 +1122,7 @@ module.exports = (function() {
this.connectionManager.close(); this.connectionManager.close();
}; };
// Allows the promise to access cls namespaces
Promise.Sequelize = Sequelize;
return Sequelize; return Sequelize;
})(); })();
...@@ -118,6 +118,10 @@ Transaction.prototype.prepareEnvironment = function() { ...@@ -118,6 +118,10 @@ Transaction.prototype.prepareEnvironment = function() {
).then(function (connection) { ).then(function (connection) {
self.connection = connection; self.connection = connection;
self.connection.uuid = self.id; self.connection.uuid = self.id;
if (self.sequelize.constructor.cls) {
self.sequelize.constructor.cls.set('transaction', self);
}
}).then(function () { }).then(function () {
return self.begin(); return self.begin();
}).then(function () { }).then(function () {
...@@ -150,5 +154,6 @@ Transaction.prototype.setIsolationLevel = function() { ...@@ -150,5 +154,6 @@ Transaction.prototype.setIsolationLevel = function() {
Transaction.prototype.cleanup = function() { Transaction.prototype.cleanup = function() {
this.connection.uuid = undefined; this.connection.uuid = undefined;
return this.sequelize.connectionManager.releaseConnection(this.connection); return this.sequelize.connectionManager.releaseConnection(this.connection);
}; };
...@@ -43,6 +43,7 @@ ...@@ -43,6 +43,7 @@
"validator": "~3.22.1" "validator": "~3.22.1"
}, },
"devDependencies": { "devDependencies": {
"continuation-local-storage": "3.1.2",
"chai-as-promised": "^4.1.1", "chai-as-promised": "^4.1.1",
"sqlite3": "~3.0.0", "sqlite3": "~3.0.0",
"mysql": "~2.5.0", "mysql": "~2.5.0",
......
"use strict";
/* jshint camelcase: false */
var chai = require('chai')
, sinon = require('sinon')
, expect = chai.expect
, Support = require(__dirname + '/support')
, Sequelize = Support.Sequelize
, Promise = Sequelize.Promise
, cls = require('continuation-local-storage')
, current = Support.sequelize;
chai.config.includeStack = true;
if (current.dialect.supports.transactions) {
describe(Support.getTestDialectTeaser("Continuation local storage"), function () {
before(function () {
Sequelize.cls = cls.createNamespace('sequelize');
});
after(function () {
delete Sequelize.cls;
});
beforeEach(function () {
return Support.prepareTransactionTest(this.sequelize).bind(this).then(function (sequelize) {
this.sequelize = sequelize;
this.ns = cls.getNamespace('sequelize');
this.User = this.sequelize.define('user', {
name: Sequelize.STRING
});
return this.sequelize.sync({ force: true });
});
});
describe('context', function () {
it('supports several concurrent transactions', function () {
var t1id, t2id, self = this;
return Promise.join(
this.sequelize.transaction(function () {
t1id = self.ns.get('transaction').id;
return Promise.resolve();
}),
this.sequelize.transaction(function () {
t2id = self.ns.get('transaction').id;
return Promise.resolve();
}),
function () {
expect(t1id).to.be.ok;
expect(t2id).to.be.ok;
expect(t1id).not.to.equal(t2id);
}
);
});
it('supports nested promise chains', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().then(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('does not leak variables to the outer scope', function () {
// This is a little tricky. We want to check the values in the outer scope, when the transaction has been successfully set up, but before it has been comitted.
// We can't just call another function from inside that transaction, since that would transfer the context to that function - exactly what we are trying to prevent;
var self = this
, transactionSetup = false
, transactionEnded = false;
this.sequelize.transaction(function () {
transactionSetup = true;
return Promise.delay(500).then(function () {
expect(self.ns.get('transaction')).to.be.ok;
transactionEnded = true;
});
});
return new Promise(function (resolve) {
// Wait for the transaction to be setup
var interval = setInterval(function () {
if (transactionSetup) {
clearInterval(interval);
resolve();
}
}, 200);
}).bind(this).then(function () {
expect(transactionEnded).not.to.be.ok;
expect(this.ns.get('transaction')).not.to.be.ok;
// Just to make sure it didn't change between our last check and the assertion
expect(transactionEnded).not.to.be.ok;
});
});
it('does not leak variables to the following promise chain', function () {
return this.sequelize.transaction(function () {
return Promise.resolve();
}).bind(this).then(function () {
expect(this.ns.get('transaction')).not.to.be.ok;
});
});
});
describe('sequelize.query integration', function () {
it('automagically uses the transaction in all calls', function () {
var self = this;
return this.sequelize.transaction(function () {
return self.User.create({ name: 'bob' }).then(function () {
return Promise.all([
expect(self.User.findAll({}, { transaction: null })).to.eventually.have.length(0),
expect(self.User.findAll({})).to.eventually.have.length(1)
]);
});
});
});
});
});
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!