不要怂,就是干,撸起袖子干!

Commit c14f705c by Jan Aagaard Meier

Hopefully this upsert code works

1 parent 6147c8e8
...@@ -27,6 +27,10 @@ AbstractDialect.prototype.supports = { ...@@ -27,6 +27,10 @@ AbstractDialect.prototype.supports = {
/* does the dialect support updating autoincrement fields */ /* does the dialect support updating autoincrement fields */
update: true update: true
}, },
'ON DUPLICATE KEY': true,
/* What is the dialect's keyword for INSERT IGNORE */
'IGNORE': '',
schemas: false, schemas: false,
constraints: { constraints: {
restrict: true restrict: true
......
...@@ -154,8 +154,8 @@ module.exports = (function() { ...@@ -154,8 +154,8 @@ module.exports = (function() {
options = options || {}; options = options || {};
var query var query
, valueQuery = 'INSERT INTO <%= table %> (<%= attributes %>)<%= output %> VALUES (<%= values %>)' , valueQuery = 'INSERT<%= ignore %> INTO <%= table %> (<%= attributes %>)<%= output %> VALUES (<%= values %>)'
, emptyQuery = 'INSERT INTO <%= table %><%= output %>' , emptyQuery = 'INSERT<%= ignore %> INTO <%= table %><%= output %>'
, outputFragment , outputFragment
, fields = [] , fields = []
, values = [] , values = []
...@@ -194,6 +194,11 @@ module.exports = (function() { ...@@ -194,6 +194,11 @@ module.exports = (function() {
valueQuery = 'CREATE OR REPLACE FUNCTION pg_temp.testfunc() RETURNS SETOF <%= table %> AS $body$ BEGIN RETURN QUERY ' + valueQuery + '; EXCEPTION ' + options.exception + ' END; $body$ LANGUAGE plpgsql; SELECT * FROM pg_temp.testfunc(); DROP FUNCTION IF EXISTS pg_temp.testfunc();'; valueQuery = 'CREATE OR REPLACE FUNCTION pg_temp.testfunc() RETURNS SETOF <%= table %> AS $body$ BEGIN RETURN QUERY ' + valueQuery + '; EXCEPTION ' + options.exception + ' END; $body$ LANGUAGE plpgsql; SELECT * FROM pg_temp.testfunc(); DROP FUNCTION IF EXISTS pg_temp.testfunc();';
} }
if (this._dialect.supports['ON DUPLICATE KEY'] && options.onDuplicate) {
valueQuery += ' ON DUPLICATE KEY ' + options.onDuplicate;
emptyQuery += ' ON DUPLICATE KEY ' + options.onDuplicate;
}
valueHash = Utils.removeNullValuesFromHash(valueHash, this.options.omitNull); valueHash = Utils.removeNullValuesFromHash(valueHash, this.options.omitNull);
for (key in valueHash) { for (key in valueHash) {
...@@ -221,6 +226,7 @@ module.exports = (function() { ...@@ -221,6 +226,7 @@ module.exports = (function() {
} }
var replacements = { var replacements = {
ignore: options.ignore ? this._dialect.supports['IGNORE'] : '',
table: this.quoteTable(table), table: this.quoteTable(table),
attributes: fields.join(','), attributes: fields.join(','),
output: outputFragment, output: outputFragment,
......
...@@ -135,6 +135,10 @@ module.exports = (function() { ...@@ -135,6 +135,10 @@ module.exports = (function() {
} }
}; };
AbstractQuery.prototype.isUpsertQuery = function () {
return this.options.type === QueryTypes.UPSERT;
};
AbstractQuery.prototype.isInsertQuery = function(results, metaData) { AbstractQuery.prototype.isInsertQuery = function(results, metaData) {
var result = true; var result = true;
......
...@@ -14,6 +14,7 @@ var MysqlDialect = function(sequelize) { ...@@ -14,6 +14,7 @@ var MysqlDialect = function(sequelize) {
MysqlDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), { MysqlDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), {
'VALUES ()': true, 'VALUES ()': true,
'LIMIT ON UPDATE': true, 'LIMIT ON UPDATE': true,
'IGNORE': ' IGNORE',
lock: true, lock: true,
forShare: 'LOCK IN SHARE MODE', forShare: 'LOCK IN SHARE MODE',
index: { index: {
......
...@@ -73,10 +73,12 @@ module.exports = (function() { ...@@ -73,10 +73,12 @@ module.exports = (function() {
if (!!options.uniqueKeys) { if (!!options.uniqueKeys) {
Utils._.each(options.uniqueKeys, function(columns, indexName) { Utils._.each(options.uniqueKeys, function(columns, indexName) {
if (!Utils._.isString(indexName)) { if (!columns.singleField) { // If it's a single field its handled in column def, not as an index
indexName = 'uniq_' + tableName + '_' + columns.fields.join('_'); if (!Utils._.isString(indexName)) {
indexName = 'uniq_' + tableName + '_' + columns.fields.join('_');
}
values.attributes += ', UNIQUE ' + indexName + ' (' + Utils._.map(columns.fields, self.quoteIdentifier).join(', ') + ')';
} }
values.attributes += ', UNIQUE ' + indexName + ' (' + Utils._.map(columns.fields, self.quoteIdentifier).join(', ') + ')';
}); });
} }
...@@ -166,6 +168,16 @@ module.exports = (function() { ...@@ -166,6 +168,16 @@ module.exports = (function() {
return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') }); return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') });
}, },
upsertQuery: function (tableName, valueHash, where, rawAttributes, options) {
options.onDuplicate = 'UPDATE ';
options.onDuplicate += Object.keys(valueHash).map(function (key) {
return key + '=VALUES(' + key +')';
}).join(', ');
return this.insertQuery(tableName, valueHash, rawAttributes, options);
},
bulkInsertQuery: function(tableName, attrValueHashes, options) { bulkInsertQuery: function(tableName, attrValueHashes, options) {
var query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;' var query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;'
, tuples = [] , tuples = []
......
...@@ -15,6 +15,7 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supp ...@@ -15,6 +15,7 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supp
'RETURNING': true, 'RETURNING': true,
'DEFAULT VALUES': true, 'DEFAULT VALUES': true,
'EXCEPTION': true, 'EXCEPTION': true,
'ON DUPLICATE KEY': false,
schemas: true, schemas: true,
lock: true, lock: true,
forShare: 'FOR SHARE', forShare: 'FOR SHARE',
......
...@@ -65,7 +65,9 @@ module.exports = (function() { ...@@ -65,7 +65,9 @@ module.exports = (function() {
if (!!options.uniqueKeys) { if (!!options.uniqueKeys) {
Utils._.each(options.uniqueKeys, function(columns) { Utils._.each(options.uniqueKeys, function(columns) {
values.attributes += ', UNIQUE (' + columns.fields.map(function(f) { return self.quoteIdentifiers(f); }).join(', ') + ')'; if (!columns.singleField) { // If it's a single field its handled in column def, not as an index
values.attributes += ', UNIQUE (' + columns.fields.map(function(f) { return self.quoteIdentifiers(f); }).join(', ') + ')';
}
}); });
} }
...@@ -319,6 +321,16 @@ module.exports = (function() { ...@@ -319,6 +321,16 @@ module.exports = (function() {
}); });
}, },
// http://www.maori.geek.nz/post/postgres_upsert_update_or_insert_in_ger_using_knex_js
upsertQuery: function (tableName, valueHash, where, rawAttributes, options) {
var query = 'WITH upsert AS (<%= update %> RETURNING *) <%= insert %> WHERE NOT EXISTS (SELECT * FROM upsert)';
return Utils._.template(query)({
update: this.updateQuery(tableName, valueHash, where, options, rawAttributes),
insert: this.insertQuery(tableName, valueHash, rawAttributes, options).replace(/VALUES \((.*?)\);/, 'SELECT $1')
});
},
bulkInsertQuery: function(tableName, attrValueHashes, options, modelAttributes) { bulkInsertQuery: function(tableName, attrValueHashes, options, modelAttributes) {
options = options || {}; options = options || {};
......
...@@ -210,7 +210,7 @@ module.exports = (function() { ...@@ -210,7 +210,7 @@ module.exports = (function() {
} }
return self.handleSelectQuery(rows); return self.handleSelectQuery(rows);
} else if (QueryTypes.BULKDELETE === self.options.type) { } else if ([QueryTypes.BULKDELETE, QueryTypes.UPSERT].indexOf(self.options.type)) {
return result.rowCount; return result.rowCount;
} else if (self.isInsertQuery() || self.isUpdateQuery()) { } else if (self.isInsertQuery() || self.isUpdateQuery()) {
if (!!self.callee && self.callee.dataValues) { if (!!self.callee && self.callee.dataValues) {
......
...@@ -13,6 +13,7 @@ var SqliteDialect = function(sequelize) { ...@@ -13,6 +13,7 @@ var SqliteDialect = function(sequelize) {
SqliteDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), { SqliteDialect.prototype.supports = _.merge(_.cloneDeep(Abstract.prototype.supports), {
'DEFAULT': false, 'DEFAULT': false,
'DEFAULT VALUES': true, 'DEFAULT VALUES': true,
'IGNORE': ' OR IGNORE',
index: { index: {
using: false using: false
}, },
......
...@@ -103,7 +103,9 @@ module.exports = (function() { ...@@ -103,7 +103,9 @@ module.exports = (function() {
if (!!options.uniqueKeys) { if (!!options.uniqueKeys) {
Utils._.each(options.uniqueKeys, function(columns) { Utils._.each(options.uniqueKeys, function(columns) {
values.attributes += ", UNIQUE (" + columns.fields.join(', ') + ")"; if (!columns.singleField) { // If it's a single field its handled in column def, not as an index
values.attributes += ", UNIQUE (" + columns.fields.join(', ') + ")";
}
}); });
} }
...@@ -182,6 +184,13 @@ module.exports = (function() { ...@@ -182,6 +184,13 @@ module.exports = (function() {
return "SELECT name FROM `sqlite_master` WHERE type='table' and name!='sqlite_sequence';"; return "SELECT name FROM `sqlite_master` WHERE type='table' and name!='sqlite_sequence';";
}, },
upsertQuery: function (tableName, valueHash, where, rawAttributes, options) {
options.ignore = true;
var sql = this.insertQuery(tableName, valueHash, rawAttributes, options) + ' ' + this.updateQuery(tableName, valueHash, where, options, rawAttributes);
return sql;
},
bulkInsertQuery: function(tableName, attrValueHashes, options) { bulkInsertQuery: function(tableName, attrValueHashes, options) {
var query = "INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;" var query = "INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;"
, tuples = [] , tuples = []
......
...@@ -240,7 +240,9 @@ module.exports = (function() { ...@@ -240,7 +240,9 @@ module.exports = (function() {
}; };
Query.prototype.getDatabaseMethod = function() { Query.prototype.getDatabaseMethod = function() {
if (this.isInsertQuery() || this.isUpdateQuery() || (this.sql.toLowerCase().indexOf('CREATE TEMPORARY TABLE'.toLowerCase()) !== -1) || this.options.type === QueryTypes.BULKDELETE) { if (this.isUpsertQuery()) {
return 'run';
} else if (this.isInsertQuery() || this.isUpdateQuery() || (this.sql.toLowerCase().indexOf('CREATE TEMPORARY TABLE'.toLowerCase()) !== -1) || this.options.type === QueryTypes.BULKDELETE) {
return 'run'; return 'run';
} else { } else {
return 'all'; return 'all';
......
...@@ -186,20 +186,30 @@ module.exports = (function() { ...@@ -186,20 +186,30 @@ module.exports = (function() {
// Identify primary and unique attributes // Identify primary and unique attributes
Utils._.each(this.rawAttributes, function(options, attribute) { Utils._.each(this.rawAttributes, function(options, attribute) {
if (options.hasOwnProperty('unique') && options.unique !== true && options.unique !== false) { if (options.hasOwnProperty('unique')) {
var idxName = options.unique; var idxName;
if (typeof options.unique === 'object') { if (options.unique === true) {
idxName = options.unique.name; idxName = self.tableName + '_' + attribute + '_unique';
} self.options.uniqueKeys[idxName] = {
name: idxName,
fields: [attribute],
singleField: true,
};
} else if (options.unique !== false) {
idxName = options.unique;
if (typeof options.unique === 'object') {
idxName = options.unique.name;
}
self.options.uniqueKeys[idxName] = self.options.uniqueKeys[idxName] || {fields: [], msg: null}; self.options.uniqueKeys[idxName] = self.options.uniqueKeys[idxName] || {fields: [], msg: null};
self.options.uniqueKeys[idxName].fields.push(options.field || attribute); self.options.uniqueKeys[idxName].fields.push(options.field || attribute);
self.options.uniqueKeys[idxName].msg = self.options.uniqueKeys[idxName].msg || options.unique.msg || null; self.options.uniqueKeys[idxName].msg = self.options.uniqueKeys[idxName].msg || options.unique.msg || null;
self.options.uniqueKeys[idxName].name = idxName || false; self.options.uniqueKeys[idxName].name = idxName || false;
} }
if (options.primaryKey === true) { if (options.primaryKey === true) {
self.primaryKeys[attribute] = self.attributes[attribute]; self.primaryKeys[attribute] = self.attributes[attribute];
}
} }
}); });
...@@ -1026,6 +1036,7 @@ module.exports = (function() { ...@@ -1026,6 +1036,7 @@ module.exports = (function() {
* @param {Boolean} [options.isDirty=true] * @param {Boolean} [options.isDirty=true]
* @param {Array} [options.fields] If set, only columns matching those in fields will be saved * @param {Array} [options.fields] If set, only columns matching those in fields will be saved
* @param {Array} [options.include] an array of include options - Used to build prefetched/included model instances * @param {Array} [options.include] an array of include options - Used to build prefetched/included model instances
* @param {String} [options.onDuplicate]
* @param {Transaction} [options.transaction] * @param {Transaction} [options.transaction]
* *
* @return {Promise<Instance>} * @return {Promise<Instance>}
...@@ -1164,6 +1175,20 @@ module.exports = (function() { ...@@ -1164,6 +1175,20 @@ module.exports = (function() {
}; };
/** /**
* @param {Object} values
* @param {Object} [options]
*
* @return {Promise<created>} Returns a boolean indicating whether the row was created or updated.
*/
Model.prototype.upsert = function (values, options) {
options = options || {};
// TODO - should we do a .build here to get default values + setters?
return this.QueryInterface.upsert(this.getTableName(), values, this, options);
};
/**
* Create and insert multiple instances in bulk. * Create and insert multiple instances in bulk.
* *
* The success handler is passed an array of instances, but please notice that these may not completely represent the state of the rows in the DB. This is because MySQL * The success handler is passed an array of instances, but please notice that these may not completely represent the state of the rows in the DB. This is because MySQL
......
...@@ -440,6 +440,57 @@ module.exports = (function() { ...@@ -440,6 +440,57 @@ module.exports = (function() {
}); });
}; };
QueryInterface.prototype.upsert = function(tableName, values, model, options) {
var wheres = []
, where
, indexFields
, indexes = []
, attributes = Object.keys(values);
if (model.primaryKeyField in values) {
where = {};
where[model.primaryKeyField] = values[model.primaryKeyField];
wheres.push(where);
}
// Lets combine uniquekeys and indexes into one
indexes = Utils._.map(model.options.uniqueKeys, function (value, key) {
return value.fields;
});
Utils._.each(model.options.indexes, function (value, key) {
if (value.unique === true) {
// fields in the index may both the strings or objects with an attribute property - lets sanitize that
indexFields = Utils._.map(value.fields, function (field) {
if (Utils._.isPlainObject(field)) {
return field.attribute;
}
return field;
});
indexes.push(indexFields);
}
});
indexes.forEach(function (index) {
if (Utils._.intersection(attributes, index).length === index.length) {
where = {};
index.forEach(function (field) {
where[field] = values[field];
});
wheres.push(where);
}
});
where = this.sequelize.or.apply(this.sequelize, wheres);
options.type = QueryTypes.UPSERT;
options.raw = true;
var sql = this.QueryGenerator.upsertQuery(tableName, values, where, model.rawAttributes, options);
return this.sequelize.query(sql, null, options).then(function (rowCount) {
return rowCount > 0;
});
};
QueryInterface.prototype.bulkInsert = function(tableName, records, options, attributes) { QueryInterface.prototype.bulkInsert = function(tableName, records, options, attributes) {
var sql = this.QueryGenerator.bulkInsertQuery(tableName, records, options, attributes); var sql = this.QueryGenerator.bulkInsertQuery(tableName, records, options, attributes);
return this.sequelize.query(sql, null, options); return this.sequelize.query(sql, null, options);
......
...@@ -4,5 +4,6 @@ module.exports = { ...@@ -4,5 +4,6 @@ module.exports = {
SELECT: 'SELECT', SELECT: 'SELECT',
INSERT: 'INSERT', INSERT: 'INSERT',
BULKUPDATE: 'BULKUPDATE', BULKUPDATE: 'BULKUPDATE',
BULKDELETE: 'BULKDELETE' BULKDELETE: 'BULKDELETE',
UPSERT: 'UPSERT',
}; };
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!