不要怂,就是干,撸起袖子干!

Commit 7d597efb by David Tsai Committed by Jan Aagaard Meier

Implement optimistic locking (#6637)

* Implement optimistic locking

* Update unit tests

* Add sqlite support

* Add MSSQL support

* Add optimistic locking tests

* Update changelog

* Fix some tests

* Fix lint errors

* Updated SQL Server query implementation

* Update documentation

* Trigger rebuild

* Add eventually to promise test

* Updates from PR feedback

* Add another optimistic locking test for update()

* Update changelog.

* Update error name

* Add optimistic locking documentation section

* Use ES6 style in tests

* Updated documentation

* Code formatting
1 parent be30a5e7
......@@ -10,6 +10,7 @@
- [FIXED] MSSQL bulkInsertQuery when options and attributes are not passed
- [FIXED] `DATEONLY` now returns `YYYY-MM-DD` date string [#4858] (https://github.com/sequelize/sequelize/issues/4858)
- [FIXED] Issues with `createFunction` and `dropFunction` (PostgresSQL)
- [ADDED] Optimistic locking support [#6637] (https://github.com/sequelize/sequelize/pull/6637)
- [FIXED] Issue with belongsTo association and foreign keys [#6400](https://github.com/sequelize/sequelize/issues/6400)
- [FIXED] Issue with query generation in MSSQL, an identifier was not escaped [#6686] (https://github.com/sequelize/sequelize/pull/6686)
- [FIXED] GroupedLimit when foreignKey has a field alias
......
......@@ -501,7 +501,12 @@ var Bar = sequelize.define('bar', { /* bla */ }, {
freezeTableName: true,
// define the table's name
tableName: 'my_very_custom_table_name'
tableName: 'my_very_custom_table_name',
// Enable optimistic locking. When enabled, sequelize will add a version count attriubte
// to the model and throw an OptimisticLockingError error when stale instances are saved.
// Set to true or a string with the attribute name you want to use to enable.
version: true
})
```
......@@ -574,6 +579,13 @@ sequelize.import('project', function(sequelize, DataTypes) {
})
```
## Optimistic Locking
Sequelize has built-in support for optimistic locking through a model instance version count.
Optimistic locking is disabled by default and can be enabled by setting the `version` property to true in a specific model definition or global model configuration. See [model configuration][0] for more details.
Optimistic locking allows concurrent access to model records for edits and prevents conflicts from overwriting data. It does this by checking whether another process has made changes to a record since it was read and throws an OptimisticLockError when a conflict is detected.
## Database synchronization
When starting a new project you won't have a database structure and using Sequelize you won't need to. Just specify your model structures and let the library do the rest. Currently supported is the creation and deletion of tables:
......
......@@ -71,8 +71,7 @@ class Query extends AbstractQuery {
} else {
// QUERY SUPPORT
const results = [];
const request = new connection.lib.Request(this.sql, err => {
const request = new connection.lib.Request(this.sql, (err, rowCount) => {
debug(`executed(${connection.uuid || 'default'}) : ${this.sql}`);
......@@ -84,7 +83,7 @@ class Query extends AbstractQuery {
err.sql = sql;
reject(this.formatError(err));
} else {
resolve(this.formatResults(results));
resolve(this.formatResults(results, rowCount));
}
});
......@@ -130,7 +129,7 @@ class Query extends AbstractQuery {
* @param {Array} data - The result of the query execution.
* @private
*/
formatResults(data) {
formatResults(data, rowCount) {
let result = this.instance;
if (this.isInsertQuery(data)) {
this.handleInsertQuery(data);
......@@ -186,6 +185,8 @@ class Query extends AbstractQuery {
result = data[0].version;
} else if (this.isForeignKeysQuery()) {
result = data;
} else if (this.isInsertQuery() || this.isUpdateQuery()) {
result = [result, rowCount];
} else if (this.isRawQuery()) {
// MSSQL returns row data and metadata (affected rows etc) in a single object - let's standarize it, sorta
result = [data, data];
......
......@@ -139,6 +139,8 @@ class Query extends AbstractQuery {
result = data[0].version;
} else if (this.isForeignKeysQuery()) {
result = data;
} else if (this.isInsertQuery() || this.isUpdateQuery()) {
result = [result, data.affectedRows];
} else if (this.isRawQuery()) {
// MySQL returns row data and metadata (affected rows etc) in a single object - let's standarize it, sorta
result = [data, data];
......
......@@ -267,7 +267,10 @@ class Query extends AbstractQuery {
}
}
return this.instance || (rows && ((this.options.plain && rows[0]) || rows)) || undefined;
return [
this.instance || (rows && ((this.options.plain && rows[0]) || rows)) || undefined,
result.rowCount
];
} else if (this.isRawQuery()) {
return [rows, result];
} else {
......
......@@ -238,6 +238,8 @@ class Query extends AbstractQuery {
result = results[0].version;
} else if (query.options.type === QueryTypes.RAW) {
result = [results, metaData];
} else if (query.isUpdateQuery() || query.isInsertQuery()) {
result = [result, metaData.changes];
}
resolve(result);
......
......@@ -88,6 +88,25 @@ class ValidationError extends BaseError {
exports.ValidationError = ValidationError;
/**
* Thrown when attempting to update a stale model instance
* @extends BaseError
* @constructor
*/
class OptimisticLockError extends BaseError {
constructor(options) {
options = options || {};
options.message = options.message || 'Attempting to update a stale model instance: ' + options.modelName;
super(options);
this.name = 'SequelizeOptimisticLockError';
this.message = options.message;
this.modelName = options.modelName;
this.values = options.values;
this.where = options.where;
}
}
exports.OptimisticLockError = OptimisticLockError;
/**
* A base class for all database related errors.
* @extends BaseError
* @class DatabaseError
......
......@@ -125,6 +125,14 @@ class Model {
_autoGenerated: true
};
}
if (this._versionAttribute) {
tail[this._versionAttribute] = {
type: DataTypes.INTEGER,
allowNull: false,
defaultValue: 0,
_autoGenerated: true
};
}
const existingAttributes = Utils._.clone(this.rawAttributes);
this.rawAttributes = {};
......@@ -781,12 +789,21 @@ class Model {
this._timestampAttributes.deletedAt = this.options.deletedAt || Utils.underscoredIf('deletedAt', this.options.underscored);
}
}
if (this.options.version) {
if (typeof this.options.version === 'string') {
this._versionAttribute = this.options.version;
} else {
this._versionAttribute = 'version';
}
}
// Add head and tail default attributes (id, timestamps)
this._readOnlyAttributes = Utils._.values(this._timestampAttributes);
if (this._versionAttribute) {
this._readOnlyAttributes.push(this._versionAttribute);
}
this._hasReadOnlyAttributes = this._readOnlyAttributes && this._readOnlyAttributes.length;
this._isReadOnlyAttribute = Utils._.memoize(key => this._hasReadOnlyAttributes && this._readOnlyAttributes.indexOf(key) !== -1);
this._addDefaultAttributes();
this.refreshAttributes();
......@@ -2877,7 +2894,7 @@ class Model {
* @property where
* @return {Object}
*/
where() {
where(checkVersion) {
const where = this.constructor.primaryKeyAttributes.reduce((result, attribute) => {
result[attribute] = this.get(attribute, {raw: true});
return result;
......@@ -2886,6 +2903,10 @@ class Model {
if (_.size(where) === 0) {
return this._modelOptions.whereCollection;
}
const versionAttr = this.constructor._versionAttribute;
if (checkVersion && versionAttr) {
where[versionAttr] = this.get(versionAttr, {raw: true});
}
return Utils.mapWhereFieldNames(where, this.constructor);
}
......@@ -3276,6 +3297,7 @@ class Model {
const primaryKeyName = this.constructor.primaryKeyAttribute;
const primaryKeyAttribute = primaryKeyName && this.constructor.rawAttributes[primaryKeyName];
const createdAtAttr = this.constructor._timestampAttributes.createdAt;
const versionAttr = this.constructor._versionAttribute;
const hook = this.isNewRecord ? 'Create' : 'Update';
const wasNewRecord = this.isNewRecord;
const now = Utils.now(this.sequelize.options.dialect);
......@@ -3284,6 +3306,9 @@ class Model {
if (updatedAtAttr && options.fields.length >= 1 && options.fields.indexOf(updatedAtAttr) === -1) {
options.fields.push(updatedAtAttr);
}
if (versionAttr && options.fields.length >= 1 && options.fields.indexOf(versionAttr) === -1) {
options.fields.push(versionAttr);
}
if (options.silent === true && !(this.isNewRecord && this.get(updatedAtAttr, {raw: true}))) {
// UpdateAtAttr might have been added as a result of Object.keys(Model.attributes). In that case we have to remove it again
......@@ -3383,24 +3408,43 @@ class Model {
if (!options.fields.length) return this;
if (!this.changed() && !this.isNewRecord) return this;
const versionFieldName = _.get(this.constructor.rawAttributes[versionAttr], 'field') || versionAttr;
let values = Utils.mapValueFieldNames(this.dataValues, options.fields, this.constructor);
let query = null;
let args = [];
let where;
if (this.isNewRecord) {
query = 'insert';
args = [this, this.constructor.getTableName(options), values, options];
} else {
let where = this.where();
where = this.where(true);
where = Utils.mapValueFieldNames(where, Object.keys(where), this.constructor);
if (versionAttr) {
values[versionFieldName] += 1;
}
query = 'update';
args = [this, this.constructor.getTableName(options), values, where, options];
}
return this.sequelize.getQueryInterface()[query].apply(this.sequelize.getQueryInterface(), args)
.then(result => {
.then(results => {
const result = _.head(results);
const rowsUpdated = results[1];
if (versionAttr) {
// Check to see that a row was updated, otherwise it's an optimistic locking error.
if (rowsUpdated < 1) {
throw new sequelizeErrors.OptimisticLockError({
modelName: this.constructor.name,
values: values,
where: where,
});
} else {
result.dataValues[versionAttr] = values[versionFieldName];
}
}
// Transfer database generated values (defaults, autoincrement, etc)
for (const attr of Object.keys(this.constructor.rawAttributes)) {
if (this.constructor.rawAttributes[attr].field &&
......@@ -3580,7 +3624,7 @@ class Model {
return this.constructor.runHooks('beforeDestroy', this, options);
}
}).then(() => {
const where = this.where();
const where = this.where(true);
if (this.constructor._timestampAttributes.deletedAt && options.force === false) {
const attribute = this.constructor.rawAttributes[this.constructor._timestampAttributes.deletedAt];
......@@ -3592,7 +3636,19 @@ class Model {
this.setDataValue(field, values[field]);
return this.sequelize.getQueryInterface().update(this, this.constructor.getTableName(options), values, where, _.defaults({ hooks: false }, options));
return this.sequelize.getQueryInterface().update(
this, this.constructor.getTableName(options), values, where, _.defaults({ hooks: false }, options)
).then(results => {
const rowsUpdated = results[1];
if (this.constructor._versionAttribute && rowsUpdated < 1) {
throw new sequelizeErrors.OptimisticLockError({
modelName: this.constructor.name,
values: values,
where: where,
});
}
return _.head(results);
});
} else {
return this.sequelize.getQueryInterface().delete(this, this.constructor.getTableName(options), where, _.assign({ type: QueryTypes.DELETE, limit: null }, options));
}
......@@ -3669,6 +3725,7 @@ class Model {
increment(fields, options) {
const identifier = this.where();
const updatedAtAttr = this.constructor._timestampAttributes.updatedAt;
const versionAttr = this.constructor._versionAttribute;
const updatedAtAttribute = this.constructor.rawAttributes[updatedAtAttr];
options = _.defaults({}, options, {
......@@ -3693,6 +3750,9 @@ class Model {
if (!options.silent && updatedAtAttr && !values[updatedAtAttr]) {
options.attributes[updatedAtAttribute.field || updatedAtAttr] = this.constructor._getDefaultTimestamp(updatedAtAttr) || Utils.now(this.sequelize.options.dialect);
}
if (versionAttr) {
values[versionAttr] = 1;
}
for (const attr of Object.keys(values)) {
// Field name mapping
......
......@@ -470,9 +470,9 @@ class QueryInterface {
options.type = QueryTypes.INSERT;
options.instance = instance;
return this.sequelize.query(sql, options).then(result => {
if (instance) result.isNewRecord = false;
return result;
return this.sequelize.query(sql, options).then(results => {
if (instance) results[0].isNewRecord = false;
return results;
});
}
......@@ -537,7 +537,7 @@ class QueryInterface {
options = _.clone(options) || {};
options.type = QueryTypes.INSERT;
const sql = this.QueryGenerator.bulkInsertQuery(tableName, records, options, attributes);
return this.sequelize.query(sql, options);
return this.sequelize.query(sql, options).then(results => results[0]);
}
update(instance, tableName, values, identifier, options) {
......
'use strict';
const Support = require(__dirname + '/../support');
const DataTypes = require(__dirname + '/../../../lib/data-types');
const chai = require('chai');
const expect = chai.expect;
describe(Support.getTestDialectTeaser('Model'), function() {
describe('optimistic locking', function () {
var Account;
beforeEach(function() {
Account = this.sequelize.define('Account', {
number: {
type: DataTypes.INTEGER,
}
}, {
version: true
});
return Account.sync({force: true});
});
it('should increment the version on save', function() {
return Account.create({number: 1}).then(account => {
account.number += 1;
expect(account.version).to.eq(0);
return account.save();
}).then(account => {
expect(account.version).to.eq(1);
});
});
it('should increment the version on update', function() {
return Account.create({number: 1}).then(account => {
expect(account.version).to.eq(0);
return account.update({ number: 2 });
}).then(account => {
expect(account.version).to.eq(1);
account.number += 1;
return account.save();
}).then(account => {
expect(account.number).to.eq(3);
expect(account.version).to.eq(2);
});
});
it('prevents stale instances from being saved', function() {
return expect(Account.create({number: 1}).then(accountA => {
return Account.findById(accountA.id).then(accountB => {
accountA.number += 1;
return accountA.save().then(function() { return accountB; });
});
}).then(accountB => {
accountB.number += 1;
return accountB.save();
})).to.eventually.be.rejectedWith(Support.Sequelize.OptimisticLockError);
});
it('increment() also increments the version', function() {
return Account.create({number: 1}).then(account => {
expect(account.version).to.eq(0);
return account.increment('number', { by: 1} );
}).then(account => {
return account.reload();
}).then(account => {
expect(account.version).to.eq(1);
});
});
});
});
......@@ -218,7 +218,8 @@ describe(Support.getTestDialectTeaser('QueryInterface'), function() {
autoIncrement: true
}
}).bind(this).then(function() {
return this.queryInterface.insert(null, 'TableWithPK', {}, {raw: true, returning: true, plain: true}).then(function(response) {
return this.queryInterface.insert(null, 'TableWithPK', {}, {raw: true, returning: true, plain: true}).then(function(results) {
var response = _.head(results);
expect(response.table_id || (typeof response !== 'object' && response)).to.be.ok;
});
});
......
......@@ -18,7 +18,7 @@ beforeEach(function() {
return Support.clearDatabase(this.sequelize);
});
afterEach(function () {
afterEach(function() {
try {
this.sequelize.test.verifyNoRunningQueries();
} catch (err) {
......
......@@ -27,10 +27,10 @@ describe(Support.getTestDialectTeaser('Instance'), function() {
before(function() {
stub = sinon.stub(current, 'query').returns(
Sequelize.Promise.resolve({
Sequelize.Promise.resolve([{
_previousDataValues: {id: 1},
dataValues: {id: 2}
})
}, 1])
);
});
......
......@@ -35,10 +35,10 @@ describe(Support.getTestDialectTeaser('Instance'), function() {
before(function() {
stub = sinon.stub(current, 'query').returns(
Sequelize.Promise.resolve({
Sequelize.Promise.resolve([{
_previousDataValues: {},
dataValues: {id: 1}
})
}, 1])
);
});
......
......@@ -275,7 +275,7 @@ describe(Support.getTestDialectTeaser('InstanceValidator'), function() {
before(function () {
this.stub = sinon.stub(current, 'query', function () {
return new Promise(function (resolve) {
resolve(User.build({}));
resolve([User.build({}), 1]);
});
});
});
......@@ -437,7 +437,7 @@ describe(Support.getTestDialectTeaser('InstanceValidator'), function() {
});
before(function () {
this.stub = sinon.stub(current, 'query').returns(Promise.resolve(User.build()));
this.stub = sinon.stub(current, 'query').returns(Promise.resolve([User.build(), 1]));
});
after(function () {
......@@ -512,7 +512,7 @@ describe(Support.getTestDialectTeaser('InstanceValidator'), function() {
});
before(function () {
this.stub = sinon.stub(current, 'query').returns(Promise.resolve(User.build()));
this.stub = sinon.stub(current, 'query').returns(Promise.resolve([User.build(), 1]));
});
after(function () {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!