不要怂,就是干,撸起袖子干!

Commit 000b28c7 by overlookmotel

CLS support with `cls-bluebird` module

1 parent acce21d4
......@@ -2,6 +2,7 @@
- [ADDED] include now supports string as an argument (on top of model/association), string will expand into an association matched literally from Model.associations
- [FIXED] Accept dates as string while using `typeValidation` [#6453](https://github.com/sequelize/sequelize/issues/6453)
- [FIXED] - ORDER clause was not included in subquery if `order` option value was provided as plain string (not as an array value)
- [FIXED] support for CLS with `cls-bluebird` module
# 4.0.0-1
- [CHANGED] Removed `modelManager` parameter from `Model.init()` [#6437](https://github.com/sequelize/sequelize/issues/6437)
......
......@@ -1952,8 +1952,8 @@ class Model {
options = _.assign({}, options);
if (options.transaction === undefined && this.sequelize.constructor.cls) {
const t = this.sequelize.constructor.cls.get('transaction');
if (options.transaction === undefined && this.sequelize.constructor._cls) {
const t = this.sequelize.constructor._cls.get('transaction');
if (t) {
options.transaction = t;
}
......
'use strict';
const Promise = require('bluebird').getNewLibraryCopy();
const shimmer = require('shimmer');
// functionName: The Promise function that should be shimmed
// fnArgs: The arguments index that should be CLS enabled (typically all callbacks). Offset from last if negative
function shimCLS(object, functionName, fnArgs){
shimmer.wrap(object, functionName, fn => {
return function() {
if (Promise.Sequelize && Promise.Sequelize.cls) {
const ns = Promise.Sequelize.cls;
for(let x = 0; x < fnArgs.length; x++) {
const argIndex = fnArgs[x] < 0 ? arguments.length + fnArgs[x] : fnArgs[x];
if ( argIndex < arguments.length && typeof arguments[argIndex] === 'function' ) {
arguments[argIndex] = ns.bind( arguments[argIndex] );
}
}
}
return fn.apply(this, arguments);
};
});
}
// Core
shimCLS(Promise, 'join', [-1]);
shimCLS(Promise.prototype, 'then', [0, 1, 2]);
shimCLS(Promise.prototype, 'spread', [0, 1]);
shimCLS(Promise.prototype, 'catch', [-1]);
shimCLS(Promise.prototype, 'error', [0]);
shimCLS(Promise.prototype, 'finally', [0]);
// Collections
shimCLS(Promise, 'map', [1]);
shimCLS(Promise, 'mapSeries', [1]);
shimCLS(Promise, 'reduce', [1]);
shimCLS(Promise, 'filter', [1]);
shimCLS(Promise, 'each', [1]);
shimCLS(Promise.prototype, 'map', [0]);
shimCLS(Promise.prototype, 'mapSeries', [0]);
shimCLS(Promise.prototype, 'reduce', [0]);
shimCLS(Promise.prototype, 'filter', [0]);
shimCLS(Promise.prototype, 'each', [0]);
// Promisification
shimCLS(Promise.prototype, 'nodeify', [0]);
// Utility
shimCLS(Promise.prototype, 'tap', [0]);
// Error management configuration
shimCLS(Promise.prototype, 'done', [0, 1]);
module.exports = Promise;
module.exports.Promise = Promise;
......
......@@ -3,6 +3,7 @@
const url = require('url');
const Path = require('path');
const retry = require('retry-as-promised');
const clsBluebird = require('cls-bluebird');
const Utils = require('./utils');
const Model = require('./model');
const DataTypes = require('./data-types');
......@@ -522,8 +523,8 @@ class Sequelize {
searchPath: this.options.hasOwnProperty('searchPath') ? this.options.searchPath : 'DEFAULT'
});
if (options.transaction === undefined && Sequelize.cls) {
options.transaction = Sequelize.cls.get('transaction');
if (options.transaction === undefined && Sequelize._cls) {
options.transaction = Sequelize._cls.get('transaction');
}
if (!options.type) {
......@@ -952,7 +953,7 @@ class Sequelize {
* const cls = require('continuation-local-storage');
* const ns = cls.createNamespace('....');
* const Sequelize = require('sequelize');
* Sequelize.cls = ns;
* Sequelize.useCls(ns);
* ```
* Note, that CLS is enabled for all sequelize instances, and all instances will share the same namespace
*
......@@ -974,41 +975,61 @@ class Sequelize {
// testhint argsConform.end
const transaction = new Transaction(this, options);
const ns = Sequelize.cls;
if (autoCallback) {
let transactionResolver = (resolve, reject) => {
transaction.prepareEnvironment().then(() => {
if (ns) {
autoCallback = ns.bind(autoCallback);
const ns = Sequelize._cls;
if (!autoCallback) return transaction.prepareEnvironment().return(transaction);
// autoCallback provided
const wrapper = ns ? function(fn) {
var promise;
ns.run(() => promise = fn());
return promise;
} : function(fn) { return fn(); };
return wrapper(() =>
transaction.prepareEnvironment().then(() =>
autoCallback(transaction)
).tap(() =>
transaction.commit()
).catch(err =>
// Rollback transaction if not already finished (commit, rollback, etc) and reject with original error
Promise.try(() => {
// Rollback (ignore any error in rollback)
if (!transaction.finished) return transaction.rollback().catch(function() {});
}).throw(err)
)
);
}
const returnValue = autoCallback(transaction);
if (!returnValue || !returnValue.then) throw new Error('You need to return a promise chain/thenable to the sequelize.transaction() callback');
/**
* Use CLS with Sequelize.
* CLS namespace provided is stored as `Sequelize._cls`
* and bluebird Promise is patched to use the namespace, using `cls-bluebird` module.
*
* @param {Object} ns CLS namespace
* @returns {Object} Sequelize constructor
*/
static useCls(ns) {
// check `ns` is valid CLS namespace
if (!ns || typeof ns !== 'object' || typeof ns.bind !== 'function' || typeof ns.run !== 'function') throw new Error('Must provide CLS namespace');
return returnValue.then(result => transaction.commit()).then(() => {
resolve(returnValue);
});
}).catch(err => {
// If the transaction has already finished (commit, rollback, etc), reject with the original error
if (transaction.finished) {
reject(err);
} else {
return transaction.rollback().finally(() => {
reject(err);
});
}
});
};
// save namespace as `Sequelize._cls`
this._cls = ns;
if (ns) {
transactionResolver = ns.bind(transactionResolver, ns.createContext());
// patch bluebird to bind all promise callbacks to CLS namespace
clsBluebird(ns, Promise);
// return Sequelize for chaining
return this;
}
return new Promise(transactionResolver);
} else {
return transaction.prepareEnvironment().return(transaction);
static get cls() {
return this._cls;
}
static set cls(ns) {
Utils.deprecate('Sequelize.cls should not be set directly. Use Sequelize.useCls().');
this.useCls(ns);
}
log() {
......@@ -1314,7 +1335,6 @@ Sequelize.prototype.InstanceError = Sequelize.InstanceError =
Sequelize.prototype.EmptyResultError = Sequelize.EmptyResultError =
sequelizeErrors.EmptyResultError;
// Allows the promise to access cls namespaces
module.exports = Promise.Sequelize = Sequelize;
module.exports = Sequelize;
module.exports.Sequelize = Sequelize;
module.exports.default = Sequelize;
......@@ -111,8 +111,8 @@ class Transaction {
throw setupErr;
}))
.tap(() => {
if (this.sequelize.constructor.cls) {
this.sequelize.constructor.cls.set('transaction', this);
if (this.sequelize.constructor._cls) {
this.sequelize.constructor._cls.set('transaction', this);
}
return null;
});
......@@ -155,7 +155,7 @@ class Transaction {
}
_clearCls() {
const cls = this.sequelize.constructor.cls;
const cls = this.sequelize.constructor._cls;
if (cls) {
if (cls.get('transaction') === this) {
......
......@@ -37,6 +37,7 @@
},
"dependencies": {
"bluebird": "^3.4.6",
"cls-bluebird": "^2.0.1",
"debug": "^2.2.0",
"depd": "^1.1.0",
"dottie": "^1.0.0",
......@@ -48,7 +49,6 @@
"node-uuid": "~1.4.4",
"retry-as-promised": "^2.0.0",
"semver": "^5.0.1",
"shimmer": "1.1.0",
"terraformer-wkt-parser": "^1.1.2",
"toposort-class": "^1.0.1",
"validator": "^5.6.0",
......
......@@ -13,11 +13,12 @@ var chai = require('chai')
if (current.dialect.supports.transactions) {
describe(Support.getTestDialectTeaser('Continuation local storage'), function () {
before(function () {
Sequelize.cls = cls.createNamespace('sequelize');
this.thenOriginal = Promise.prototype.then;
Sequelize.useCls(cls.createNamespace('sequelize'));
});
after(function () {
delete Sequelize.cls;
delete Sequelize._cls;
});
beforeEach(function () {
......@@ -144,270 +145,14 @@ if (current.dialect.supports.transactions) {
});
});
describe('bluebird shims', function () {
beforeEach(function () {
// Make sure we have some data so the each, map, filter, ... actually run and validate asserts
return this.sequelize.Promise.all([this.User.create({ name: 'bob' }), this.User.create({ name: 'joe' })]);
});
it('join', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.join(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('then fulfilled', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().then(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('then rejected', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.reject(new Error('test rejection handler')).then(null,function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
it('bluebird patch is applied', function() {
expect(Promise.prototype.then).to.be.a('function');
expect(this.thenOriginal).to.be.a('function');
expect(Promise.prototype.then).not.to.equal(this.thenOriginal);
});
it('spread', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().spread(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
},function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('catch', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.try(function () {
throw new Error('To test catch');
}).catch(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('error', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.try(function () {
throw new self.sequelize.Promise.OperationalError('To test catch');
}).error(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('finally', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().finally( function(){
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('map', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().map(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static map', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.map(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('mapSeries', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().mapSeries(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static mapSeries', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
// In order to execute promises serially with mapSeries we must wrap them as functions
return self.sequelize.Promise.mapSeries([
()=> self.User.findAll().then(()=> expect(self.ns.get('transaction').id).to.be.ok),
()=> self.User.findAll().then(()=> expect(self.ns.get('transaction').id).to.equal(tid))
], runPromise => runPromise()
);
});
it('CLS namespace is stored in Sequelize._cls', function() {
expect(Sequelize._cls).to.equal(this.ns);
});
it('reduce', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().reduce(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static reduce', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.reduce(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('filter', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().filter(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static filter', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.filter(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('each', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().each(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('static each', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.sequelize.Promise.each(self.User.findAll(), function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('nodeify', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().nodeify(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('tap', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return self.User.findAll().tap(function () {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
});
});
});
it('done fulfilled', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return new Promise(function (resolve, reject) {
self.User.findAll().done(function () {
try {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
resolve();
} catch (err) {
reject(err);
}
}, function (err) {
reject(err);
});
});
});
});
it('done rejected', function () {
var self = this;
return this.sequelize.transaction(function () {
var tid = self.ns.get('transaction').id;
return new Promise(function (resolve, reject) {
Promise.reject(new Error('test rejection handler')).done(function () {
reject(new Error('Should not have called first done handler'));
}, function (err) {
try {
expect(self.ns.get('transaction').id).to.be.ok;
expect(self.ns.get('transaction').id).to.equal(tid);
resolve();
} catch (err) {
reject(err);
}
});
});
});
});
});
});
}
......@@ -72,15 +72,6 @@ describe(Support.getTestDialectTeaser('Transaction'), function() {
});
});
it('errors when no promise chain is returned', function() {
var t;
return (expect(this.sequelize.transaction(function(transaction) {
t = transaction;
})).to.eventually.be.rejected).then(function() {
expect(t.finished).to.be.equal('rollback');
});
});
if (dialect === 'postgres' || dialect === 'mssql') {
it('do not rollback if already committed', function() {
var SumSumSum = this.sequelize.define('transaction', {
......
......@@ -15,11 +15,11 @@ describe(Support.getTestDialectTeaser('Model'), function() {
describe('method findOrCreate', function () {
before(function () {
current.constructor.cls = cls.createNamespace('sequelize');
current.constructor.useCls(cls.createNamespace('sequelize'));
});
after(function () {
delete current.constructor.cls;
delete current.constructor._cls;
});
beforeEach(function () {
......@@ -30,7 +30,7 @@ describe(Support.getTestDialectTeaser('Model'), function() {
this.transactionStub = stub(this.User.sequelize, 'transaction');
this.transactionStub.returns(new Promise(function () {}));
this.clsStub = stub(current.constructor.cls, 'get');
this.clsStub = stub(current.constructor._cls, 'get');
this.clsStub.returns({ id: 123 });
});
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!