不要怂,就是干,撸起袖子干!

Commit d1ecd972 by Jan Aagaard Meier Committed by GitHub

Merge pull request #6556 from overlookmotel/cls-bluebird

CLS support with `cls-bluebird` module
2 parents b97bfaa9 edf393ff
......@@ -2,6 +2,7 @@
- [ADDED] include now supports string as an argument (on top of model/association), string will expand into an association matched literally from Model.associations
- [FIXED] Accept dates as string while using `typeValidation` [#6453](https://github.com/sequelize/sequelize/issues/6453)
- [FIXED] - ORDER clause was not included in subquery if `order` option value was provided as plain string (not as an array value)
- [FIXED] support for CLS with `cls-bluebird` module
# 4.0.0-1
- [CHANGED] Removed `modelManager` parameter from `Model.init()` [#6437](https://github.com/sequelize/sequelize/issues/6437)
......
......@@ -59,18 +59,18 @@ var cls = require('continuation-local-storage'),
namespace = cls.createNamespace('my-very-own-namespace');
```
To enable CLS you must tell sequelize which namespace to use by setting it as a property on the sequelize constructor:
To enable CLS you must tell sequelize which namespace to use by using a static method of the sequelize constructor:
```js
var Sequelize = require('sequelize');
Sequelize.cls = namespace;
Sequelize.useCLS(namespace);
new Sequelize(....);
```
Notice, that the `cls` property must be set on the *constructor*, not on an instance of sequelize. This means that all instances will share the same namespace, and that CLS is all-or-nothing - you cannot enable it only for some instances.
Notice, that the `useCLS()` method is on the *constructor*, not on an instance of sequelize. This means that all instances will share the same namespace, and that CLS is all-or-nothing - you cannot enable it only for some instances.
CLS works like a thread-local storage for callbacks. What this means in practice is, that different callback chains can access local variables by using the CLS namespace. When CLS is enabled sequelize will set the `transaction` property on the namespace when a new transaction is created. Since variables set within a callback chain are private to that chain several concurrent transactions can exist at the same time:
CLS works like a thread-local storage for callbacks. What this means in practice is that different callback chains can access local variables by using the CLS namespace. When CLS is enabled sequelize will set the `transaction` property on the namespace when a new transaction is created. Since variables set within a callback chain are private to that chain several concurrent transactions can exist at the same time:
```js
sequelize.transaction(function (t1) {
......@@ -91,6 +91,8 @@ sequelize.transaction(function (t1) {
});
```
After you've used `Sequelize.useCLS()` all promises returned from sequelize will be patched to maintain CLS context. CLS is a complicated subject - more details in the docs for [cls-bluebird](https://www.npmjs.com/package/cls-bluebird), the patch used to make bluebird promises work with CLS.
# Concurrent/Partial transactions
You can have concurrent transactions within a sequence of queries or have some of them excluded from any transactions. Use the `{transaction: }` option to control which transaction a query belong to:
......
......@@ -684,7 +684,7 @@ class Model {
if (globalOptions.define) {
options = Utils.merge(globalOptions.define, options);
}
if (!options.modelName) {
options.modelName = this.name;
}
......@@ -1952,8 +1952,8 @@ class Model {
options = _.assign({}, options);
if (options.transaction === undefined && this.sequelize.constructor.cls) {
const t = this.sequelize.constructor.cls.get('transaction');
if (options.transaction === undefined && this.sequelize.constructor._cls) {
const t = this.sequelize.constructor._cls.get('transaction');
if (t) {
options.transaction = t;
}
......
'use strict';
const Promise = require('bluebird').getNewLibraryCopy();
const shimmer = require('shimmer');
// functionName: The Promise function that should be shimmed
// fnArgs: The arguments index that should be CLS enabled (typically all callbacks). Offset from last if negative
function shimCLS(object, functionName, fnArgs){
shimmer.wrap(object, functionName, fn => {
return function() {
if (Promise.Sequelize && Promise.Sequelize.cls) {
const ns = Promise.Sequelize.cls;
for(let x = 0; x < fnArgs.length; x++) {
const argIndex = fnArgs[x] < 0 ? arguments.length + fnArgs[x] : fnArgs[x];
if ( argIndex < arguments.length && typeof arguments[argIndex] === 'function' ) {
arguments[argIndex] = ns.bind( arguments[argIndex] );
}
}
}
return fn.apply(this, arguments);
};
});
}
// Core
shimCLS(Promise, 'join', [-1]);
shimCLS(Promise.prototype, 'then', [0, 1, 2]);
shimCLS(Promise.prototype, 'spread', [0, 1]);
shimCLS(Promise.prototype, 'catch', [-1]);
shimCLS(Promise.prototype, 'error', [0]);
shimCLS(Promise.prototype, 'finally', [0]);
// Collections
shimCLS(Promise, 'map', [1]);
shimCLS(Promise, 'mapSeries', [1]);
shimCLS(Promise, 'reduce', [1]);
shimCLS(Promise, 'filter', [1]);
shimCLS(Promise, 'each', [1]);
shimCLS(Promise.prototype, 'map', [0]);
shimCLS(Promise.prototype, 'mapSeries', [0]);
shimCLS(Promise.prototype, 'reduce', [0]);
shimCLS(Promise.prototype, 'filter', [0]);
shimCLS(Promise.prototype, 'each', [0]);
// Promisification
shimCLS(Promise.prototype, 'nodeify', [0]);
// Utility
shimCLS(Promise.prototype, 'tap', [0]);
// Error management configuration
shimCLS(Promise.prototype, 'done', [0, 1]);
module.exports = Promise;
module.exports.Promise = Promise;
......
......@@ -3,6 +3,7 @@
const url = require('url');
const Path = require('path');
const retry = require('retry-as-promised');
const clsBluebird = require('cls-bluebird');
const Utils = require('./utils');
const Model = require('./model');
const DataTypes = require('./data-types');
......@@ -522,8 +523,8 @@ class Sequelize {
searchPath: this.options.hasOwnProperty('searchPath') ? this.options.searchPath : 'DEFAULT'
});
if (options.transaction === undefined && Sequelize.cls) {
options.transaction = Sequelize.cls.get('transaction');
if (options.transaction === undefined && Sequelize._cls) {
options.transaction = Sequelize._cls.get('transaction');
}
if (!options.type) {
......@@ -952,7 +953,7 @@ class Sequelize {
* const cls = require('continuation-local-storage');
* const ns = cls.createNamespace('....');
* const Sequelize = require('sequelize');
* Sequelize.cls = ns;
* Sequelize.useCLS(ns);
* ```
* Note, that CLS is enabled for all sequelize instances, and all instances will share the same namespace
*
......@@ -974,41 +975,76 @@ class Sequelize {
// testhint argsConform.end
const transaction = new Transaction(this, options);
const ns = Sequelize.cls;
if (autoCallback) {
let transactionResolver = (resolve, reject) => {
transaction.prepareEnvironment().then(() => {
if (ns) {
autoCallback = ns.bind(autoCallback);
}
const returnValue = autoCallback(transaction);
if (!returnValue || !returnValue.then) throw new Error('You need to return a promise chain/thenable to the sequelize.transaction() callback');
return returnValue.then(result => transaction.commit()).then(() => {
resolve(returnValue);
});
}).catch(err => {
// If the transaction has already finished (commit, rollback, etc), reject with the original error
if (transaction.finished) {
reject(err);
} else {
return transaction.rollback().finally(() => {
reject(err);
});
}
if (!autoCallback) return transaction.prepareEnvironment().return(transaction);
// autoCallback provided
return Sequelize._clsRun(() => {
return transaction.prepareEnvironment()
.then(() => autoCallback(transaction))
.tap(() => transaction.commit())
.catch(err => {
// Rollback transaction if not already finished (commit, rollback, etc)
// and reject with original error (ignore any error in rollback)
return Promise.try(() => {
if (!transaction.finished) return transaction.rollback().catch(function() {});
}).throw(err);
});
};
});
}
if (ns) {
transactionResolver = ns.bind(transactionResolver, ns.createContext());
}
/**
* Use CLS with Sequelize.
* CLS namespace provided is stored as `Sequelize._cls`
* and bluebird Promise is patched to use the namespace, using `cls-bluebird` module.
*
* @param {Object} ns CLS namespace
* @returns {Object} Sequelize constructor
*/
static useCLS(ns) {
// check `ns` is valid CLS namespace
if (!ns || typeof ns !== 'object' || typeof ns.bind !== 'function' || typeof ns.run !== 'function') throw new Error('Must provide CLS namespace');
return new Promise(transactionResolver);
} else {
return transaction.prepareEnvironment().return(transaction);
}
// save namespace as `Sequelize._cls`
this._cls = ns;
// patch bluebird to bind all promise callbacks to CLS namespace
clsBluebird(ns, Promise);
// return Sequelize for chaining
return this;
}
/**
* Run function in CLS context.
* If no CLS context in use, just runs the function normally
*
* @private
* @param {Function} fn Function to run
* @returns {*} Return value of function
*/
static _clsRun(fn) {
var ns = Sequelize._cls;
if (!ns) return fn();
var res;
ns.run((context) => res = fn(context));
return res;
}
/*
* Getter/setter for `Sequelize.cls`
* To maintain backward compatibility with Sequelize v3.x
* Calling the
*/
static get cls() {
Utils.deprecate('Sequelize.cls is deprecated and will be removed in a future version. Keep track of the CLS namespace you use in your own code.');
return this._cls;
}
static set cls(ns) {
Utils.deprecate('Sequelize.cls should not be set directly. Use Sequelize.useCLS().');
this.useCLS(ns);
}
log() {
......@@ -1314,7 +1350,6 @@ Sequelize.prototype.InstanceError = Sequelize.InstanceError =
Sequelize.prototype.EmptyResultError = Sequelize.EmptyResultError =
sequelizeErrors.EmptyResultError;
// Allows the promise to access cls namespaces
module.exports = Promise.Sequelize = Sequelize;
module.exports = Sequelize;
module.exports.Sequelize = Sequelize;
module.exports.default = Sequelize;
......@@ -111,8 +111,8 @@ class Transaction {
throw setupErr;
}))
.tap(() => {
if (this.sequelize.constructor.cls) {
this.sequelize.constructor.cls.set('transaction', this);
if (this.sequelize.constructor._cls) {
this.sequelize.constructor._cls.set('transaction', this);
}
return null;
});
......@@ -155,7 +155,7 @@ class Transaction {
}
_clearCls() {
const cls = this.sequelize.constructor.cls;
const cls = this.sequelize.constructor._cls;
if (cls) {
if (cls.get('transaction') === this) {
......
......@@ -37,6 +37,7 @@
},
"dependencies": {
"bluebird": "^3.4.6",
"cls-bluebird": "^2.0.1",
"debug": "^2.2.0",
"depd": "^1.1.0",
"dottie": "^1.0.0",
......@@ -48,7 +49,6 @@
"node-uuid": "~1.4.4",
"retry-as-promised": "^2.0.0",
"semver": "^5.0.1",
"shimmer": "1.1.0",
"terraformer-wkt-parser": "^1.1.2",
"toposort-class": "^1.0.1",
"validator": "^5.6.0",
......
......@@ -13,11 +13,12 @@ var chai = require('chai')
if (current.dialect.supports.transactions) {
describe(Support.getTestDialectTeaser('Continuation local storage'), function () {
before(function () {
Sequelize.cls = cls.createNamespace('sequelize');
this.thenOriginal = Promise.prototype.then;
Sequelize.useCLS(cls.createNamespace('sequelize'));
});
after(function () {
delete Sequelize.cls;
delete Sequelize._cls;
});
beforeEach(function () {
......@@ -144,270 +145,14 @@ if (current.dialect.supports.transactions) {
});
});
describe('bluebird shims', function () {
beforeEach(function () {
// Make sure we have some data so the each, map, filter, ... actually run and validate asserts
return this.sequelize.Promise.all([this.User.create({ name: 'bob' }), this.User.create({ name: 'joe' })]);
});
it('join', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.join(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('then fulfilled', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().then(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('then rejected', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.reject(new Error('test rejection handler')).then(null,function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('spread', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().spread(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
},function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('catch', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.try(function () {
throw new Error('To test catch');
}).catch(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('error', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.try(function () {
throw new self.sequelize.Promise.OperationalError('To test catch');
}).error(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('finally', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().finally( function(){
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('map', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().map(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static map', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.map(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('mapSeries', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().mapSeries(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static mapSeries', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
// In order to execute promises serially with mapSeries we must wrap them as functions
return self.sequelize.Promise.mapSeries([
()=> self.User.findAll().then(()=> expect(self.ns.get('transaction').id).to.be.ok),
()=> self.User.findAll().then(()=> expect(self.ns.get('transaction').id).to.equal(tid))
], runPromise => runPromise()
);
});
});
it('reduce', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().reduce(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static reduce', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.reduce(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('filter', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().filter(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static filter', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.filter(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('each', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().each(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static each', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.each(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('nodeify', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().nodeify(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('tap', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().tap(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('done fulfilled', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return new Promise(function (resolve, reject) {
self.User.findAll().done(function () {
try {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
resolve();
} catch (err) {
reject(err);
}
}, function (err) {
reject(err);
});
});
});
});
it('done rejected', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return new Promise(function (resolve, reject) {
Promise.reject(new Error('test rejection handler')).done(function () {
reject(new Error('Should not have called first done handler'));
}, function (err) {
try {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
resolve();
} catch (err) {
reject(err);
}
});
});
});
});
it('bluebird patch is applied', function() {
expect(Promise.prototype.then).to.be.a('function');
expect(this.thenOriginal).to.be.a('function');
expect(Promise.prototype.then).not.to.equal(this.thenOriginal);
});
it('CLS namespace is stored in Sequelize._cls', function() {
expect(Sequelize._cls).to.equal(this.ns);
});
});
}
......@@ -72,15 +72,6 @@ describe(Support.getTestDialectTeaser('Transaction'), function() {
});
});
it('errors when no promise chain is returned', function() {
var t;
return (expect(this.sequelize.transaction(function(transaction) {
t = transaction;
})).to.eventually.be.rejected).then(function() {
expect(t.finished).to.be.equal('rollback');
});
});
if (dialect === 'postgres' || dialect === 'mssql') {
it('do not rollback if already committed', function() {
var SumSumSum = this.sequelize.define('transaction', {
......
......@@ -15,11 +15,11 @@ describe(Support.getTestDialectTeaser('Model'), function() {
describe('method findOrCreate', function () {
before(function () {
current.constructor.cls = cls.createNamespace('sequelize');
current.constructor.useCLS(cls.createNamespace('sequelize'));
});
after(function () {
delete current.constructor.cls;
delete current.constructor._cls;
});
beforeEach(function () {
......@@ -30,7 +30,7 @@ describe(Support.getTestDialectTeaser('Model'), function() {
this.transactionStub = stub(this.User.sequelize, 'transaction');
this.transactionStub.returns(new Promise(function () {}));
this.clsStub = stub(current.constructor.cls, 'get');
this.clsStub = stub(current.constructor._cls, 'get');
this.clsStub.returns({ id: 123 });
});
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!