不要怂,就是干,撸起袖子干!

Commit bebb6364 by Jan Aagaard Meier

More work on upsert

1 parent c14f705c
...@@ -93,6 +93,13 @@ module.exports = (function() { ...@@ -93,6 +93,13 @@ module.exports = (function() {
result = data[0]; result = data[0];
} else if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()) { } else if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()) {
result = data.affectedRows; result = data.affectedRows;
} else if (this.isUpsertQuery()) {
if (data.affectedRows === 2) {
// In MySQL, affectedRows = 2 means that the ON DUPLICATE KEY clause was executed
result = 0;
} else {
result = data.affectedRows;
}
} }
return result; return result;
......
...@@ -168,14 +168,14 @@ module.exports = (function() { ...@@ -168,14 +168,14 @@ module.exports = (function() {
return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') }); return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') });
}, },
upsertQuery: function (tableName, valueHash, where, rawAttributes, options) { upsertQuery: function (tableName, insertValues, updateValues, where, rawAttributes, options) {
options.onDuplicate = 'UPDATE '; options.onDuplicate = 'UPDATE ';
options.onDuplicate += Object.keys(valueHash).map(function (key) { options.onDuplicate += Object.keys(updateValues).map(function (key) {
return key + '=VALUES(' + key +')'; return key + '=VALUES(' + key +')';
}).join(', '); }).join(', ');
return this.insertQuery(tableName, valueHash, rawAttributes, options); return this.insertQuery(tableName, insertValues, rawAttributes, options);
}, },
bulkInsertQuery: function(tableName, attrValueHashes, options) { bulkInsertQuery: function(tableName, attrValueHashes, options) {
......
...@@ -322,12 +322,12 @@ module.exports = (function() { ...@@ -322,12 +322,12 @@ module.exports = (function() {
}, },
// http://www.maori.geek.nz/post/postgres_upsert_update_or_insert_in_ger_using_knex_js // http://www.maori.geek.nz/post/postgres_upsert_update_or_insert_in_ger_using_knex_js
upsertQuery: function (tableName, valueHash, where, rawAttributes, options) { upsertQuery: function (tableName, insertValues, updateValues, where, rawAttributes, options) {
var query = 'WITH upsert AS (<%= update %> RETURNING *) <%= insert %> WHERE NOT EXISTS (SELECT * FROM upsert)'; var query = 'WITH upsert AS (<%= update %> RETURNING *) <%= insert %> WHERE NOT EXISTS (SELECT * FROM upsert)';
return Utils._.template(query)({ return Utils._.template(query)({
update: this.updateQuery(tableName, valueHash, where, options, rawAttributes), update: this.updateQuery(tableName, updateValues, where, options, rawAttributes),
insert: this.insertQuery(tableName, valueHash, rawAttributes, options).replace(/VALUES \((.*?)\);/, 'SELECT $1') insert: this.insertQuery(tableName, insertValues, rawAttributes, options).replace(/VALUES \((.*?)\);/, 'SELECT $1')
}); });
}, },
......
...@@ -184,9 +184,9 @@ module.exports = (function() { ...@@ -184,9 +184,9 @@ module.exports = (function() {
return "SELECT name FROM `sqlite_master` WHERE type='table' and name!='sqlite_sequence';"; return "SELECT name FROM `sqlite_master` WHERE type='table' and name!='sqlite_sequence';";
}, },
upsertQuery: function (tableName, valueHash, where, rawAttributes, options) { upsertQuery: function (tableName, insertValues, updateValues, where, rawAttributes, options) {
options.ignore = true; options.ignore = true;
var sql = this.insertQuery(tableName, valueHash, rawAttributes, options) + ' ' + this.updateQuery(tableName, valueHash, where, options, rawAttributes); var sql = this.insertQuery(tableName, insertValues, rawAttributes, options) + ' ' + this.updateQuery(tableName, updateValues, where, options, rawAttributes);
return sql; return sql;
}, },
......
...@@ -131,6 +131,8 @@ module.exports = (function() { ...@@ -131,6 +131,8 @@ module.exports = (function() {
result = results; result = results;
} else if ([QueryTypes.BULKUPDATE, QueryTypes.BULKDELETE].indexOf(self.options.type) !== -1) { } else if ([QueryTypes.BULKUPDATE, QueryTypes.BULKDELETE].indexOf(self.options.type) !== -1) {
result = metaData.changes; result = metaData.changes;
} else if (self.options.type === QueryTypes.UPSERT) {
result = undefined;
} }
resolve(result); resolve(result);
...@@ -241,7 +243,7 @@ module.exports = (function() { ...@@ -241,7 +243,7 @@ module.exports = (function() {
Query.prototype.getDatabaseMethod = function() { Query.prototype.getDatabaseMethod = function() {
if (this.isUpsertQuery()) { if (this.isUpsertQuery()) {
return 'run'; return 'exec'; // Needed to run multiple queries in one
} else if (this.isInsertQuery() || this.isUpdateQuery() || (this.sql.toLowerCase().indexOf('CREATE TEMPORARY TABLE'.toLowerCase()) !== -1) || this.options.type === QueryTypes.BULKDELETE) { } else if (this.isInsertQuery() || this.isUpdateQuery() || (this.sql.toLowerCase().indexOf('CREATE TEMPORARY TABLE'.toLowerCase()) !== -1) || this.options.type === QueryTypes.BULKDELETE) {
return 'run'; return 'run';
} else { } else {
......
...@@ -529,26 +529,11 @@ module.exports = (function() { ...@@ -529,26 +529,11 @@ module.exports = (function() {
} }
if (updatedAtAttr) { if (updatedAtAttr) {
values[updatedAtAttr] = ( values[updatedAtAttr] = self.Model.__getTimestamp(updatedAtAttr);
(
self.isNewRecord
&& !!self.Model.rawAttributes[updatedAtAttr]
&& !!self.Model.rawAttributes[updatedAtAttr].defaultValue
)
? self.Model.rawAttributes[updatedAtAttr].defaultValue
: Utils.now(self.sequelize.options.dialect)
);
} }
if (self.isNewRecord && createdAtAttr && !values[createdAtAttr]) { if (self.isNewRecord && createdAtAttr && !values[createdAtAttr]) {
values[createdAtAttr] = ( values[createdAtAttr] = self.Model.__getTimestamp(createdAtAttr);
(
!!self.Model.rawAttributes[createdAtAttr]
&& !!self.Model.rawAttributes[createdAtAttr].defaultValue
)
? self.Model.rawAttributes[createdAtAttr].defaultValue
: Utils.now(self.sequelize.options.dialect)
);
} }
var query = null var query = null
......
...@@ -1175,15 +1175,48 @@ module.exports = (function() { ...@@ -1175,15 +1175,48 @@ module.exports = (function() {
}; };
/** /**
* Insert or update a single row. An update will be executed if a row which matches the supplied values on either the primary key or a unique key is found. Note that the unique index must be defined in your sequelize model and not just in the table. Otherwise you may experience a unique constraint violation, because sequelize fails to identify the row that should be updated.
*
* **Implementation details:**
*
* * MySQL - Implemented as a single query `INSERT values ON DUPLICATE KEY UPDATE values`
* * PostgreSQL - Implemented as two queries `WITH upsert AS (update_query) insert_query WHERE NOT EXISTS (SELECT * FROM upsert)`, as outlined in // http://www.maori.geek.nz/post/postgres_upsert_update_or_insert_in_ger_using_knex_js
* * SQLite - Implemented as two queries `INSERT; UPDATE`. This means that the update is executed regardless of whether the row already existed or not
*
* **Note** that SQLite returns undefined for created, no matter if the row was created or updated. This is because SQLite always runs INSERT OR IGNORE + UPDATE, in a single query, so there is no way to know whether the row was inserted or not.
*
* @param {Object} values * @param {Object} values
* @param {Object} [options] * @param {Object} [options]
* @param {Array} [options.fields=Object.keys(this.attributes)] The fields to insert / update. Defaults to all fields
* *
* @return {Promise<created>} Returns a boolean indicating whether the row was created or updated. * @return {Promise<created>} Returns a boolean indicating whether the row was created or updated.
*/ */
Model.prototype.upsert = function (values, options) { Model.prototype.upsert = function (values, options) {
options = options || {}; options = options || {};
// TODO - should we do a .build here to get default values + setters? if (!options.fields) {
options.fields = Object.keys(this.attributes);
}
var createdAtAttr = this._timestampAttributes.createdAt
, updatedAtAttr = this._timestampAttributes.updatedAt
, hadPrimary = this.primaryKeyField in values;
values = Utils._.pick(values, options.fields);
values = this.build(values).get(); // Get default values - this also adds a null value for the PK if none is given
if (createdAtAttr && !values[createdAtAttr]) {
values[createdAtAttr] = this.__getTimestamp(createdAtAttr);
}
if (updatedAtAttr && !values[updatedAtAttr]) {
values[updatedAtAttr] = this.__getTimestamp(updatedAtAttr);
}
// Build adds a null value for the primary key, if none was given by the user.
// We need to remove that because of some Postgres technicalities.
if (!hadPrimary) {
delete values[this.primaryKeyField];
}
return this.QueryInterface.upsert(this.getTableName(), values, this, options); return this.QueryInterface.upsert(this.getTableName(), values, this, options);
}; };
...@@ -1679,6 +1712,14 @@ module.exports = (function() { ...@@ -1679,6 +1712,14 @@ module.exports = (function() {
this.__sql = sql.setDialect(dialect === 'mariadb' ? 'mysql' : dialect); this.__sql = sql.setDialect(dialect === 'mariadb' ? 'mysql' : dialect);
}; };
Model.prototype.__getTimestamp = function(attr) {
if (!!this.rawAttributes[attr] && !!this.rawAttributes[attr].defaultValue) {
return this.rawAttributes[attr].defaultValue;
} else {
return Utils.now(this.sequelize.options.dialect);
}
};
var mapFieldNames = function(options, Model) { var mapFieldNames = function(options, Model) {
if (options.attributes) { if (options.attributes) {
options.attributes = options.attributes.map(function(attr) { options.attributes = options.attributes.map(function(attr) {
......
...@@ -445,9 +445,10 @@ module.exports = (function() { ...@@ -445,9 +445,10 @@ module.exports = (function() {
, where , where
, indexFields , indexFields
, indexes = [] , indexes = []
, updateValues
, attributes = Object.keys(values); , attributes = Object.keys(values);
if (model.primaryKeyField in values) { if (values[model.primaryKeyField]) {
where = {}; where = {};
where[model.primaryKeyField] = values[model.primaryKeyField]; where[model.primaryKeyField] = values[model.primaryKeyField];
wheres.push(where); wheres.push(where);
...@@ -485,8 +486,23 @@ module.exports = (function() { ...@@ -485,8 +486,23 @@ module.exports = (function() {
options.type = QueryTypes.UPSERT; options.type = QueryTypes.UPSERT;
options.raw = true; options.raw = true;
var sql = this.QueryGenerator.upsertQuery(tableName, values, where, model.rawAttributes, options);
if (model._timestampAttributes.createdAt) {
// If we are updating an existing row, we shouldn't set createdAt
updateValues = Utils._.cloneDeep(values);
delete updateValues[model._timestampAttributes.createdAt];
} else {
updateValues = values;
}
var sql = this.QueryGenerator.upsertQuery(tableName, values, updateValues, where, model.rawAttributes, options);
return this.sequelize.query(sql, null, options).then(function (rowCount) { return this.sequelize.query(sql, null, options).then(function (rowCount) {
if (rowCount === undefined) {
return rowCount;
}
return rowCount > 0; return rowCount > 0;
}); });
}; };
......
...@@ -49,7 +49,7 @@ describe(Support.getTestDialectTeaser("Self"), function() { ...@@ -49,7 +49,7 @@ describe(Support.getTestDialectTeaser("Self"), function() {
}); });
}); });
it('can handle n:m associations', function() { it.only('can handle n:m associations', function() {
var self = this; var self = this;
var Person = this.sequelize.define('Person', { name: DataTypes.STRING }); var Person = this.sequelize.define('Person', { name: DataTypes.STRING });
...@@ -60,6 +60,7 @@ describe(Support.getTestDialectTeaser("Self"), function() { ...@@ -60,6 +60,7 @@ describe(Support.getTestDialectTeaser("Self"), function() {
var foreignIdentifiers = _.map(_.values(Person.associations), 'foreignIdentifier'); var foreignIdentifiers = _.map(_.values(Person.associations), 'foreignIdentifier');
var rawAttributes = _.keys(this.sequelize.models.Family.rawAttributes); var rawAttributes = _.keys(this.sequelize.models.Family.rawAttributes);
console.log(rawAttributes);
expect(foreignIdentifiers.length).to.equal(2); expect(foreignIdentifiers.length).to.equal(2);
expect(rawAttributes.length).to.equal(4); expect(rawAttributes.length).to.equal(4);
......
"use strict";
/* jshint camelcase: false */
/* jshint expr: true */
var chai = require('chai')
, sinon = require('sinon')
, Sequelize = require('../../index')
, Promise = Sequelize.Promise
, expect = chai.expect
, Support = require(__dirname + '/../support')
, DataTypes = require(__dirname + "/../../lib/data-types")
, dialect = Support.getTestDialect()
, datetime = require('chai-datetime')
, _ = require('lodash')
, assert = require('assert');
chai.use(datetime);
chai.config.includeStack = true;
describe(Support.getTestDialectTeaser("DAOFactory"), function () {
beforeEach(function () {
this.clock = sinon.useFakeTimers();
this.User = this.sequelize.define('user', {
username: DataTypes.STRING,
foo: {
unique: 'foobar',
type: DataTypes.STRING
},
bar: {
unique: 'foobar',
type: DataTypes.INTEGER
},
});
return this.sequelize.sync({ force: true });
});
afterEach(function () {
this.clock.restore();
});
describe('upsert', function () {
it('works with upsert on id', function () {
return this.User.upsert({ id: 42, username: 'john' }).bind(this).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).to.be.ok;
}
this.clock.tick(2000); // Make sure to pass some time so updatedAt != createdAt
return this.User.upsert({ id: 42, username: 'doe' });
}).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).not.to.be.ok;
}
return this.User.find(42);
}).then(function (user) {
expect(user.createdAt).to.be.defined;
expect(user.username).to.equal('doe');
expect(user.updatedAt).to.be.afterTime(user.createdAt);
});
});
it('works with upsert on a composite key', function () {
return this.User.upsert({ foo: 'baz', bar: 19, username: 'john' }).bind(this).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).to.be.ok;
}
this.clock.tick(2000); // Make sure to pass some time so updatedAt != createdAt
return this.User.upsert({ foo: 'baz', bar: 19, username: 'doe' });
}).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).not.to.be.ok;
}
return this.User.find({ where: { foo: 'baz', bar: 19 }});
}).then(function (user) {
expect(user.createdAt).to.be.defined;
expect(user.username).to.equal('doe');
expect(user.updatedAt).to.be.afterTime(user.createdAt);
});
});
});
});
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!