不要怂,就是干,撸起袖子干!

Commit 0655153e by Mick Hansen

Merge pull request #2518 from janmeier/upsert

Upsert
2 parents a1eb3a3d eeb764cb
# Next # Next
- [FEATURE] Added the possibility of removing multiple associations in 1 call [#2338](https://github.com/sequelize/sequelize/issues/2338) - [FEATURE] Added the possibility of removing multiple associations in 1 call [#2338](https://github.com/sequelize/sequelize/issues/2338)
- [FEAUTRE] Undestroy method for paranoid models [#2540](https://github.com/sequelize/sequelize/pull/2540) - [FEATURE] Undestroy method for paranoid models [#2540](https://github.com/sequelize/sequelize/pull/2540)
- [FEATURE] Support for UPSERT
- [BUG] Add support for `field` named the same as the attribute in `reload`, `bulkCreate` and `save` [#2348](https://github.com/sequelize/sequelize/issues/2348) - [BUG] Add support for `field` named the same as the attribute in `reload`, `bulkCreate` and `save` [#2348](https://github.com/sequelize/sequelize/issues/2348)
- [BUG] Copy the options object in association getters. [#2311](https://github.com/sequelize/sequelize/issues/2311) - [BUG] Copy the options object in association getters. [#2311](https://github.com/sequelize/sequelize/issues/2311)
- [BUG] `Model#destroy()` now supports `field`, this also fixes an issue with `N:M#removeAssociation` and `field` - [BUG] `Model#destroy()` now supports `field`, this also fixes an issue with `N:M#removeAssociation` and `field`
......
...@@ -27,6 +27,10 @@ AbstractDialect.prototype.supports = { ...@@ -27,6 +27,10 @@ AbstractDialect.prototype.supports = {
/* does the dialect support updating autoincrement fields */ /* does the dialect support updating autoincrement fields */
update: true update: true
}, },
'ON DUPLICATE KEY': true,
/* What is the dialect's keyword for INSERT IGNORE */
'IGNORE': '',
schemas: false, schemas: false,
constraints: { constraints: {
restrict: true restrict: true
......
...@@ -154,8 +154,8 @@ module.exports = (function() { ...@@ -154,8 +154,8 @@ module.exports = (function() {
options = options || {}; options = options || {};
var query var query
, valueQuery = 'INSERT INTO <%= table %> (<%= attributes %>)<%= output %> VALUES (<%= values %>)' , valueQuery = 'INSERT<%= ignore %> INTO <%= table %> (<%= attributes %>)<%= output %> VALUES (<%= values %>)'
, emptyQuery = 'INSERT INTO <%= table %><%= output %>' , emptyQuery = 'INSERT<%= ignore %> INTO <%= table %><%= output %>'
, outputFragment , outputFragment
, fields = [] , fields = []
, values = [] , values = []
...@@ -194,6 +194,11 @@ module.exports = (function() { ...@@ -194,6 +194,11 @@ module.exports = (function() {
valueQuery = 'CREATE OR REPLACE FUNCTION pg_temp.testfunc() RETURNS SETOF <%= table %> AS $body$ BEGIN RETURN QUERY ' + valueQuery + '; EXCEPTION ' + options.exception + ' END; $body$ LANGUAGE plpgsql; SELECT * FROM pg_temp.testfunc(); DROP FUNCTION IF EXISTS pg_temp.testfunc();'; valueQuery = 'CREATE OR REPLACE FUNCTION pg_temp.testfunc() RETURNS SETOF <%= table %> AS $body$ BEGIN RETURN QUERY ' + valueQuery + '; EXCEPTION ' + options.exception + ' END; $body$ LANGUAGE plpgsql; SELECT * FROM pg_temp.testfunc(); DROP FUNCTION IF EXISTS pg_temp.testfunc();';
} }
if (this._dialect.supports['ON DUPLICATE KEY'] && options.onDuplicate) {
valueQuery += ' ON DUPLICATE KEY ' + options.onDuplicate;
emptyQuery += ' ON DUPLICATE KEY ' + options.onDuplicate;
}
valueHash = Utils.removeNullValuesFromHash(valueHash, this.options.omitNull); valueHash = Utils.removeNullValuesFromHash(valueHash, this.options.omitNull);
for (key in valueHash) { for (key in valueHash) {
...@@ -221,6 +226,7 @@ module.exports = (function() { ...@@ -221,6 +226,7 @@ module.exports = (function() {
} }
var replacements = { var replacements = {
ignore: options.ignore ? this._dialect.supports['IGNORE'] : '',
table: this.quoteTable(table), table: this.quoteTable(table),
attributes: fields.join(','), attributes: fields.join(','),
output: outputFragment, output: outputFragment,
......
...@@ -47,58 +47,6 @@ module.exports = (function() { ...@@ -47,58 +47,6 @@ module.exports = (function() {
}; };
/** /**
* High level function that handles the results of a query execution.
*
*
* Example:
* query.formatResults([
* {
* id: 1, // this is from the main table
* attr2: 'snafu', // this is from the main table
* Tasks.id: 1, // this is from the associated table
* Tasks.title: 'task' // this is from the associated table
* }
* ])
*
* @param {Array} data - The result of the query execution.
*/
AbstractQuery.prototype.formatResults = function(data) {
var result = this.callee;
if (this.isInsertQuery(data)) {
this.handleInsertQuery(data);
}
if (this.isSelectQuery()) {
result = this.handleSelectQuery(data);
} else if (this.isShowTableQuery()) {
result = this.handleShowTableQuery(data);
} else if (this.isShowOrDescribeQuery()) {
result = data;
if (this.sql.toLowerCase().indexOf('describe') === 0) {
result = {};
data.forEach(function(_result) {
result[_result.Field] = {
type: _result.Type.toUpperCase(),
allowNull: (_result.Null === 'YES'),
defaultValue: _result.Default
};
});
} else if (this.isShowIndexesQuery()) {
result = this.handleShowIndexesQuery(data);
}
} else if (this.isCallQuery()) {
result = data[0];
} else if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()) {
result = data.affectedRows;
}
return result;
};
/**
* Get the attributes of an insert query, which contains the just inserted id. * Get the attributes of an insert query, which contains the just inserted id.
* *
* @return {String} The field name. * @return {String} The field name.
...@@ -135,6 +83,10 @@ module.exports = (function() { ...@@ -135,6 +83,10 @@ module.exports = (function() {
} }
}; };
AbstractQuery.prototype.isUpsertQuery = function () {
return this.options.type === QueryTypes.UPSERT;
};
AbstractQuery.prototype.isInsertQuery = function(results, metaData) { AbstractQuery.prototype.isInsertQuery = function(results, metaData) {
var result = true; var result = true;
......
...@@ -14,6 +14,7 @@ var MysqlDialect = function(sequelize) { ...@@ -14,6 +14,7 @@ var MysqlDialect = function(sequelize) {
MysqlDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), { MysqlDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), {
'VALUES ()': true, 'VALUES ()': true,
'LIMIT ON UPDATE': true, 'LIMIT ON UPDATE': true,
'IGNORE': ' IGNORE',
lock: true, lock: true,
forShare: 'LOCK IN SHARE MODE', forShare: 'LOCK IN SHARE MODE',
index: { index: {
......
...@@ -73,10 +73,12 @@ module.exports = (function() { ...@@ -73,10 +73,12 @@ module.exports = (function() {
if (!!options.uniqueKeys) { if (!!options.uniqueKeys) {
Utils._.each(options.uniqueKeys, function(columns, indexName) { Utils._.each(options.uniqueKeys, function(columns, indexName) {
if (!columns.singleField) { // If it's a single field its handled in column def, not as an index
if (!Utils._.isString(indexName)) { if (!Utils._.isString(indexName)) {
indexName = 'uniq_' + tableName + '_' + columns.fields.join('_'); indexName = 'uniq_' + tableName + '_' + columns.fields.join('_');
} }
values.attributes += ', UNIQUE ' + indexName + ' (' + Utils._.map(columns.fields, self.quoteIdentifier).join(', ') + ')'; values.attributes += ', UNIQUE ' + indexName + ' (' + Utils._.map(columns.fields, self.quoteIdentifier).join(', ') + ')';
}
}); });
} }
...@@ -166,6 +168,16 @@ module.exports = (function() { ...@@ -166,6 +168,16 @@ module.exports = (function() {
return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') }); return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') });
}, },
upsertQuery: function (tableName, insertValues, updateValues, where, rawAttributes, options) {
options.onDuplicate = 'UPDATE ';
options.onDuplicate += Object.keys(updateValues).map(function (key) {
return key + '=VALUES(' + key +')';
}).join(', ');
return this.insertQuery(tableName, insertValues, rawAttributes, options);
},
bulkInsertQuery: function(tableName, attrValueHashes, options) { bulkInsertQuery: function(tableName, attrValueHashes, options) {
var query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;' var query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;'
, tuples = [] , tuples = []
......
...@@ -46,6 +46,59 @@ module.exports = (function() { ...@@ -46,6 +46,59 @@ module.exports = (function() {
return promise; return promise;
}; };
/**
* High level function that handles the results of a query execution.
*
*
* Example:
* query.formatResults([
* {
* id: 1, // this is from the main table
* attr2: 'snafu', // this is from the main table
* Tasks.id: 1, // this is from the associated table
* Tasks.title: 'task' // this is from the associated table
* }
* ])
*
* @param {Array} data - The result of the query execution.
*/
Query.prototype.formatResults = function(data) {
var result = this.callee;
if (this.isInsertQuery(data)) {
this.handleInsertQuery(data);
}
if (this.isSelectQuery()) {
result = this.handleSelectQuery(data);
} else if (this.isShowTableQuery()) {
result = this.handleShowTableQuery(data);
} else if (this.isShowOrDescribeQuery()) {
result = data;
if (this.sql.toLowerCase().indexOf('describe') === 0) {
result = {};
data.forEach(function(_result) {
result[_result.Field] = {
type: _result.Type.toUpperCase(),
allowNull: (_result.Null === 'YES'),
defaultValue: _result.Default
};
});
} else if (this.isShowIndexesQuery()) {
result = this.handleShowIndexesQuery(data);
}
} else if (this.isCallQuery()) {
result = data[0];
} else if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery() || this.isUpsertQuery()) {
result = data.affectedRows;
}
return result;
};
Query.prototype.formatError = function (err) { Query.prototype.formatError = function (err) {
var match; var match;
......
...@@ -15,6 +15,7 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supp ...@@ -15,6 +15,7 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supp
'RETURNING': true, 'RETURNING': true,
'DEFAULT VALUES': true, 'DEFAULT VALUES': true,
'EXCEPTION': true, 'EXCEPTION': true,
'ON DUPLICATE KEY': false,
schemas: true, schemas: true,
lock: true, lock: true,
forShare: 'FOR SHARE', forShare: 'FOR SHARE',
......
...@@ -65,7 +65,9 @@ module.exports = (function() { ...@@ -65,7 +65,9 @@ module.exports = (function() {
if (!!options.uniqueKeys) { if (!!options.uniqueKeys) {
Utils._.each(options.uniqueKeys, function(columns) { Utils._.each(options.uniqueKeys, function(columns) {
if (!columns.singleField) { // If it's a single field its handled in column def, not as an index
values.attributes += ', UNIQUE (' + columns.fields.map(function(f) { return self.quoteIdentifiers(f); }).join(', ') + ')'; values.attributes += ', UNIQUE (' + columns.fields.map(function(f) { return self.quoteIdentifiers(f); }).join(', ') + ')';
}
}); });
} }
...@@ -319,6 +321,50 @@ module.exports = (function() { ...@@ -319,6 +321,50 @@ module.exports = (function() {
}); });
}, },
fn: function(fnName, tableName, body, returns, language) {
fnName = fnName || 'testfunc';
language = language || 'plpgsql';
returns = returns || 'SETOF ' + this.quoteTable(tableName);
var query = 'CREATE OR REPLACE FUNCTION pg_temp.<%= fnName %>() RETURNS <%= returns %> AS $$ BEGIN <%= body %> END; $$ LANGUAGE <%= language %>; SELECT * FROM pg_temp.<%= fnName %>();';
return Utils._.template(query)({
fnName: fnName,
returns: returns,
language: language,
body: body
});
},
exceptionFn: function(fnName, tableName, main, then, when, returns, language) {
when = when || 'unique_violation';
var body = '<%= main %> EXCEPTION WHEN <%= when %> THEN <%= then %>;';
body = Utils._.template(body, {
main: main,
when: when,
then: then
});
return this.fn(fnName, tableName, body, returns, language);
},
// http://www.maori.geek.nz/post/postgres_upsert_update_or_insert_in_ger_using_knex_js
upsertQuery: function (tableName, insertValues, updateValues, where, rawAttributes, options) {
var insert = this.insertQuery(tableName, insertValues, rawAttributes, options).replace(/VALUES \((.*?)\)/, 'SELECT $1');
var update = this.updateQuery(tableName, updateValues, where, options, rawAttributes);
// The numbers here are selected to match the number of affected rows returned by MySQL
return this.exceptionFn(
'sequelize_upsert',
tableName,
insert + " RETURN 1;",
update + "; RETURN 2",
'unique_violation',
'integer'
);
},
bulkInsertQuery: function(tableName, attrValueHashes, options, modelAttributes) { bulkInsertQuery: function(tableName, attrValueHashes, options, modelAttributes) {
options = options || {}; options = options || {};
......
...@@ -212,6 +212,8 @@ module.exports = (function() { ...@@ -212,6 +212,8 @@ module.exports = (function() {
return self.handleSelectQuery(rows); return self.handleSelectQuery(rows);
} else if (QueryTypes.BULKDELETE === self.options.type) { } else if (QueryTypes.BULKDELETE === self.options.type) {
return result.rowCount; return result.rowCount;
} else if (self.isUpsertQuery()) {
return rows[0].sequelize_upsert;
} else if (self.isInsertQuery() || self.isUpdateQuery()) { } else if (self.isInsertQuery() || self.isUpdateQuery()) {
if (!!self.callee && self.callee.dataValues) { if (!!self.callee && self.callee.dataValues) {
if (!!self.callee.Model && !!self.callee.Model._hasHstoreAttributes) { if (!!self.callee.Model && !!self.callee.Model._hasHstoreAttributes) {
......
...@@ -13,6 +13,7 @@ var SqliteDialect = function(sequelize) { ...@@ -13,6 +13,7 @@ var SqliteDialect = function(sequelize) {
SqliteDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), { SqliteDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), {
'DEFAULT': false, 'DEFAULT': false,
'DEFAULT VALUES': true, 'DEFAULT VALUES': true,
'IGNORE': ' OR IGNORE',
index: { index: {
using: false using: false
}, },
......
...@@ -103,7 +103,9 @@ module.exports = (function() { ...@@ -103,7 +103,9 @@ module.exports = (function() {
if (!!options.uniqueKeys) { if (!!options.uniqueKeys) {
Utils._.each(options.uniqueKeys, function(columns) { Utils._.each(options.uniqueKeys, function(columns) {
if (!columns.singleField) { // If it's a single field its handled in column def, not as an index
values.attributes += ", UNIQUE (" + columns.fields.join(', ') + ")"; values.attributes += ", UNIQUE (" + columns.fields.join(', ') + ")";
}
}); });
} }
...@@ -182,6 +184,13 @@ module.exports = (function() { ...@@ -182,6 +184,13 @@ module.exports = (function() {
return "SELECT name FROM `sqlite_master` WHERE type='table' and name!='sqlite_sequence';"; return "SELECT name FROM `sqlite_master` WHERE type='table' and name!='sqlite_sequence';";
}, },
upsertQuery: function (tableName, insertValues, updateValues, where, rawAttributes, options) {
options.ignore = true;
var sql = this.insertQuery(tableName, insertValues, rawAttributes, options) + ' ' + this.updateQuery(tableName, updateValues, where, options, rawAttributes);
return sql;
},
bulkInsertQuery: function(tableName, attrValueHashes, options) { bulkInsertQuery: function(tableName, attrValueHashes, options) {
var query = "INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;" var query = "INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;"
, tuples = [] , tuples = []
......
...@@ -131,6 +131,8 @@ module.exports = (function() { ...@@ -131,6 +131,8 @@ module.exports = (function() {
result = results; result = results;
} else if ([QueryTypes.BULKUPDATE, QueryTypes.BULKDELETE].indexOf(self.options.type) !== -1) { } else if ([QueryTypes.BULKUPDATE, QueryTypes.BULKDELETE].indexOf(self.options.type) !== -1) {
result = metaData.changes; result = metaData.changes;
} else if (self.options.type === QueryTypes.UPSERT) {
result = undefined;
} }
resolve(result); resolve(result);
...@@ -240,7 +242,9 @@ module.exports = (function() { ...@@ -240,7 +242,9 @@ module.exports = (function() {
}; };
Query.prototype.getDatabaseMethod = function() { Query.prototype.getDatabaseMethod = function() {
if (this.isInsertQuery() || this.isUpdateQuery() || (this.sql.toLowerCase().indexOf('CREATE TEMPORARY TABLE'.toLowerCase()) !== -1) || this.options.type === QueryTypes.BULKDELETE) { if (this.isUpsertQuery()) {
return 'exec'; // Needed to run multiple queries in one
} else if (this.isInsertQuery() || this.isUpdateQuery() || (this.sql.toLowerCase().indexOf('CREATE TEMPORARY TABLE'.toLowerCase()) !== -1) || this.options.type === QueryTypes.BULKDELETE) {
return 'run'; return 'run';
} else { } else {
return 'all'; return 'all';
......
...@@ -529,26 +529,11 @@ module.exports = (function() { ...@@ -529,26 +529,11 @@ module.exports = (function() {
} }
if (updatedAtAttr) { if (updatedAtAttr) {
values[updatedAtAttr] = ( values[updatedAtAttr] = self.Model.__getTimestamp(updatedAtAttr);
(
self.isNewRecord
&& !!self.Model.rawAttributes[updatedAtAttr]
&& !!self.Model.rawAttributes[updatedAtAttr].defaultValue
)
? self.Model.rawAttributes[updatedAtAttr].defaultValue
: Utils.now(self.sequelize.options.dialect)
);
} }
if (self.isNewRecord && createdAtAttr && !values[createdAtAttr]) { if (self.isNewRecord && createdAtAttr && !values[createdAtAttr]) {
values[createdAtAttr] = ( values[createdAtAttr] = self.Model.__getTimestamp(createdAtAttr);
(
!!self.Model.rawAttributes[createdAtAttr]
&& !!self.Model.rawAttributes[createdAtAttr].defaultValue
)
? self.Model.rawAttributes[createdAtAttr].defaultValue
: Utils.now(self.sequelize.options.dialect)
);
} }
var query = null var query = null
...@@ -854,7 +839,7 @@ module.exports = (function() { ...@@ -854,7 +839,7 @@ module.exports = (function() {
} }
if (updatedAtAttr && !values[updatedAtAttr]) { if (updatedAtAttr && !values[updatedAtAttr]) {
countOrOptions.attributes[updatedAtAttr] = Utils.now(this.Model.modelManager.sequelize.options.dialect); countOrOptions.attributes[updatedAtAttr] = this.Model.__getTimestamp(updatedAtAttr);
} }
return this.QueryInterface.increment(this, this.QueryInterface.QueryGenerator.addSchema(this.Model.tableName, this.Model.options.schema), values, where, countOrOptions); return this.QueryInterface.increment(this, this.QueryInterface.QueryGenerator.addSchema(this.Model.tableName, this.Model.options.schema), values, where, countOrOptions);
......
...@@ -186,8 +186,17 @@ module.exports = (function() { ...@@ -186,8 +186,17 @@ module.exports = (function() {
// Identify primary and unique attributes // Identify primary and unique attributes
Utils._.each(this.rawAttributes, function(options, attribute) { Utils._.each(this.rawAttributes, function(options, attribute) {
if (options.hasOwnProperty('unique') && options.unique !== true && options.unique !== false) { if (options.hasOwnProperty('unique')) {
var idxName = options.unique; var idxName;
if (options.unique === true) {
idxName = self.tableName + '_' + attribute + '_unique';
self.options.uniqueKeys[idxName] = {
name: idxName,
fields: [attribute],
singleField: true,
};
} else if (options.unique !== false) {
idxName = options.unique;
if (typeof options.unique === 'object') { if (typeof options.unique === 'object') {
idxName = options.unique.name; idxName = options.unique.name;
} }
...@@ -198,6 +207,8 @@ module.exports = (function() { ...@@ -198,6 +207,8 @@ module.exports = (function() {
self.options.uniqueKeys[idxName].name = idxName || false; self.options.uniqueKeys[idxName].name = idxName || false;
} }
}
if (options.primaryKey === true) { if (options.primaryKey === true) {
self.primaryKeys[attribute] = self.attributes[attribute]; self.primaryKeys[attribute] = self.attributes[attribute];
} }
...@@ -206,7 +217,6 @@ module.exports = (function() { ...@@ -206,7 +217,6 @@ module.exports = (function() {
// Add head and tail default attributes (id, timestamps) // Add head and tail default attributes (id, timestamps)
addDefaultAttributes.call(this); addDefaultAttributes.call(this);
addOptionalClassMethods.call(this); addOptionalClassMethods.call(this);
// Primary key convenience variables // Primary key convenience variables
this.primaryKeyAttributes = Object.keys(this.primaryKeys); this.primaryKeyAttributes = Object.keys(this.primaryKeys);
this.primaryKeyAttribute = this.primaryKeyAttributes[0]; this.primaryKeyAttribute = this.primaryKeyAttributes[0];
...@@ -242,7 +252,6 @@ module.exports = (function() { ...@@ -242,7 +252,6 @@ module.exports = (function() {
self.Instance.prototype[name] = fct; self.Instance.prototype[name] = fct;
}); });
} }
this.refreshAttributes(); this.refreshAttributes();
findAutoIncrementField.call(this); findAutoIncrementField.call(this);
...@@ -1026,6 +1035,7 @@ module.exports = (function() { ...@@ -1026,6 +1035,7 @@ module.exports = (function() {
* @param {Boolean} [options.isDirty=true] * @param {Boolean} [options.isDirty=true]
* @param {Array} [options.fields] If set, only columns matching those in fields will be saved * @param {Array} [options.fields] If set, only columns matching those in fields will be saved
* @param {Array} [options.include] an array of include options - Used to build prefetched/included model instances * @param {Array} [options.include] an array of include options - Used to build prefetched/included model instances
* @param {String} [options.onDuplicate]
* @param {Transaction} [options.transaction] * @param {Transaction} [options.transaction]
* *
* @return {Promise<Instance>} * @return {Promise<Instance>}
...@@ -1164,6 +1174,56 @@ module.exports = (function() { ...@@ -1164,6 +1174,56 @@ module.exports = (function() {
}; };
/** /**
* Insert or update a single row. An update will be executed if a row which matches the supplied values on either the primary key or a unique key is found. Note that the unique index must be defined in your sequelize model and not just in the table. Otherwise you may experience a unique constraint violation, because sequelize fails to identify the row that should be updated.
*
* **Implementation details:**
*
* * MySQL - Implemented as a single query `INSERT values ON DUPLICATE KEY UPDATE values`
* * PostgreSQL - Implemented as a temporary function with exception handling: INSERT EXCEPTION WHEN unique_constraint UPDATE
* * SQLite - Implemented as two queries `INSERT; UPDATE`. This means that the update is executed regardless of whether the row already existed or not
*
* **Note** that SQLite returns undefined for created, no matter if the row was created or updated. This is because SQLite always runs INSERT OR IGNORE + UPDATE, in a single query, so there is no way to know whether the row was inserted or not.
*
* @param {Object} values
* @param {Object} [options]
* @param {Array} [options.fields=Object.keys(this.attributes)] The fields to insert / update. Defaults to all fields
*
* @alias insertOrUpdate
* @return {Promise<created>} Returns a boolean indicating whether the row was created or updated.
*/
Model.prototype.upsert = function (values, options) {
options = options || {};
if (!options.fields) {
options.fields = Object.keys(this.attributes);
}
var createdAtAttr = this._timestampAttributes.createdAt
, updatedAtAttr = this._timestampAttributes.updatedAt
, hadPrimary = this.primaryKeyField in values;
values = Utils._.pick(values, options.fields);
values = this.build(values).get(); // Get default values - this also adds a null value for the PK if none is given
if (createdAtAttr && !values[createdAtAttr]) {
values[createdAtAttr] = this.__getTimestamp(createdAtAttr);
}
if (updatedAtAttr && !values[updatedAtAttr]) {
values[updatedAtAttr] = this.__getTimestamp(updatedAtAttr);
}
// Build adds a null value for the primary key, if none was given by the user.
// We need to remove that because of some Postgres technicalities.
if (!hadPrimary) {
delete values[this.primaryKeyField];
}
return this.QueryInterface.upsert(this.getTableName(), values, this, options);
};
Model.prototype.insertOrUpdate = Model.prototype.upsert;
/**
* Create and insert multiple instances in bulk. * Create and insert multiple instances in bulk.
* *
* The success handler is passed an array of instances, but please notice that these may not completely represent the state of the rows in the DB. This is because MySQL * The success handler is passed an array of instances, but please notice that these may not completely represent the state of the rows in the DB. This is because MySQL
...@@ -1484,8 +1544,8 @@ module.exports = (function() { ...@@ -1484,8 +1544,8 @@ module.exports = (function() {
options.type = QueryTypes.BULKUPDATE; options.type = QueryTypes.BULKUPDATE;
if (self._timestampAttributes.updatedAt) { if (this._timestampAttributes.updatedAt) {
values[self._timestampAttributes.updatedAt] = Utils.now(self.modelManager.sequelize.options.dialect); values[this._timestampAttributes.updatedAt] = this.__getTimestamp(this._timestampAttributes.updatedAt);
} }
var daos var daos
...@@ -1654,6 +1714,14 @@ module.exports = (function() { ...@@ -1654,6 +1714,14 @@ module.exports = (function() {
this.__sql = sql.setDialect(dialect === 'mariadb' ? 'mysql' : dialect); this.__sql = sql.setDialect(dialect === 'mariadb' ? 'mysql' : dialect);
}; };
Model.prototype.__getTimestamp = function(attr) {
if (!!this.rawAttributes[attr] && !!this.rawAttributes[attr].defaultValue) {
return this.rawAttributes[attr].defaultValue;
} else {
return Utils.now(this.sequelize.options.dialect);
}
};
var mapFieldNames = function(options, Model) { var mapFieldNames = function(options, Model) {
if (options.attributes) { if (options.attributes) {
options.attributes = options.attributes.map(function(attr) { options.attributes = options.attributes.map(function(attr) {
......
...@@ -440,6 +440,75 @@ module.exports = (function() { ...@@ -440,6 +440,75 @@ module.exports = (function() {
}); });
}; };
QueryInterface.prototype.upsert = function(tableName, values, model, options) {
var wheres = []
, where
, indexFields
, indexes = []
, updateValues
, attributes = Object.keys(values);
if (values[model.primaryKeyField]) {
where = {};
where[model.primaryKeyField] = values[model.primaryKeyField];
wheres.push(where);
}
// Lets combine uniquekeys and indexes into one
indexes = Utils._.map(model.options.uniqueKeys, function (value, key) {
return value.fields;
});
Utils._.each(model.options.indexes, function (value, key) {
if (value.unique === true) {
// fields in the index may both the strings or objects with an attribute property - lets sanitize that
indexFields = Utils._.map(value.fields, function (field) {
if (Utils._.isPlainObject(field)) {
return field.attribute;
}
return field;
});
indexes.push(indexFields);
}
});
indexes.forEach(function (index) {
if (Utils._.intersection(attributes, index).length === index.length) {
where = {};
index.forEach(function (field) {
where[field] = values[field];
});
wheres.push(where);
}
});
where = this.sequelize.or.apply(this.sequelize, wheres);
options.type = QueryTypes.UPSERT;
options.raw = true;
if (model._timestampAttributes.createdAt) {
// If we are updating an existing row, we shouldn't set createdAt
updateValues = Utils._.cloneDeep(values);
delete updateValues[model._timestampAttributes.createdAt];
} else {
updateValues = values;
}
var sql = this.QueryGenerator.upsertQuery(tableName, values, updateValues, where, model.rawAttributes, options);
return this.sequelize.query(sql, null, options).then(function (rowCount) {
if (rowCount === undefined) {
return rowCount;
}
// MySQL returns 1 for inserted, 2 for updated http://dev.mysql.com/doc/refman/5.0/en/insert-on-duplicate.html. Postgres has been modded to do the same
return rowCount === 1;
});
};
QueryInterface.prototype.bulkInsert = function(tableName, records, options, attributes) { QueryInterface.prototype.bulkInsert = function(tableName, records, options, attributes) {
var sql = this.QueryGenerator.bulkInsertQuery(tableName, records, options, attributes); var sql = this.QueryGenerator.bulkInsertQuery(tableName, records, options, attributes);
return this.sequelize.query(sql, null, options); return this.sequelize.query(sql, null, options);
......
...@@ -4,5 +4,6 @@ module.exports = { ...@@ -4,5 +4,6 @@ module.exports = {
SELECT: 'SELECT', SELECT: 'SELECT',
INSERT: 'INSERT', INSERT: 'INSERT',
BULKUPDATE: 'BULKUPDATE', BULKUPDATE: 'BULKUPDATE',
BULKDELETE: 'BULKDELETE' BULKDELETE: 'BULKDELETE',
UPSERT: 'UPSERT',
}; };
"use strict";
/* jshint camelcase: false */
/* jshint expr: true */
var chai = require('chai')
, sinon = require('sinon')
, Sequelize = require('../../index')
, Promise = Sequelize.Promise
, expect = chai.expect
, Support = require(__dirname + '/../support')
, DataTypes = require(__dirname + "/../../lib/data-types")
, dialect = Support.getTestDialect()
, datetime = require('chai-datetime')
, _ = require('lodash')
, assert = require('assert');
chai.use(datetime);
chai.config.includeStack = true;
describe(Support.getTestDialectTeaser("DAOFactory"), function () {
beforeEach(function () {
this.clock = sinon.useFakeTimers();
this.User = this.sequelize.define('user', {
username: DataTypes.STRING,
foo: {
unique: 'foobar',
type: DataTypes.STRING
},
bar: {
unique: 'foobar',
type: DataTypes.INTEGER
},
});
return this.sequelize.sync({ force: true });
});
afterEach(function () {
this.clock.restore();
});
describe('upsert', function () {
it('works with upsert on id', function () {
return this.User.upsert({ id: 42, username: 'john' }).bind(this).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).to.be.ok;
}
this.clock.tick(2000); // Make sure to pass some time so updatedAt != createdAt
return this.User.upsert({ id: 42, username: 'doe' });
}).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).not.to.be.ok;
}
return this.User.find(42);
}).then(function (user) {
expect(user.createdAt).to.be.defined;
expect(user.username).to.equal('doe');
expect(user.updatedAt).to.be.afterTime(user.createdAt);
});
});
it('works with upsert on a composite key', function () {
return this.User.upsert({ foo: 'baz', bar: 19, username: 'john' }).bind(this).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).to.be.ok;
}
this.clock.tick(2000); // Make sure to pass some time so updatedAt != createdAt
return this.User.upsert({ foo: 'baz', bar: 19, username: 'doe' });
}).then(function (created) {
if (dialect === 'sqlite') {
expect(created).not.to.be.defined;
} else {
expect(created).not.to.be.ok;
}
return this.User.find({ where: { foo: 'baz', bar: 19 }});
}).then(function (user) {
expect(user.createdAt).to.be.defined;
expect(user.username).to.equal('doe');
expect(user.updatedAt).to.be.afterTime(user.createdAt);
});
});
});
});
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!