不要怂,就是干,撸起袖子干!

Commit 47489ab1 by Evan Committed by Sushant

feat(postgres): support updateOnDuplicate option with bulkCreate (#11163)

1 parent 1d6fa05c
...@@ -300,10 +300,15 @@ class QueryGenerator { ...@@ -300,10 +300,15 @@ class QueryGenerator {
} }
if (this._dialect.supports.inserts.updateOnDuplicate && options.updateOnDuplicate) { if (this._dialect.supports.inserts.updateOnDuplicate && options.updateOnDuplicate) {
onDuplicateKeyUpdate = ` ON DUPLICATE KEY UPDATE ${options.updateOnDuplicate.map(attr => { if (this._dialect.supports.inserts.updateOnDuplicate == ' ON CONFLICT DO UPDATE SET') { // postgres
const key = this.quoteIdentifier(attr); // If no conflict target columns were specified, use the primary key names from options.upsertKeys
return `${key}=VALUES(${key})`; const conflictKeys = options.upsertKeys.map(attr => this.quoteIdentifier(attr));
}).join(',')}`; const updateKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=EXCLUDED.${this.quoteIdentifier(attr)}`);
onDuplicateKeyUpdate = ` ON CONFLICT (${conflictKeys.join(',')}) DO UPDATE SET ${updateKeys.join(',')}`;
} else { // mysql / maria
const valueKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=VALUES(${this.quoteIdentifier(attr)})`);
onDuplicateKeyUpdate = `${this._dialect.supports.inserts.updateOnDuplicate} ${valueKeys.join(',')}`;
}
} }
const ignoreDuplicates = options.ignoreDuplicates ? this._dialect.supports.inserts.ignoreDuplicates : ''; const ignoreDuplicates = options.ignoreDuplicates ? this._dialect.supports.inserts.ignoreDuplicates : '';
......
...@@ -28,7 +28,7 @@ MariadbDialect.prototype.supports = _.merge( ...@@ -28,7 +28,7 @@ MariadbDialect.prototype.supports = _.merge(
schemas: true, schemas: true,
inserts: { inserts: {
ignoreDuplicates: ' IGNORE', ignoreDuplicates: ' IGNORE',
updateOnDuplicate: true updateOnDuplicate: ' ON DUPLICATE KEY UPDATE'
}, },
index: { index: {
collate: false, collate: false,
......
...@@ -26,7 +26,7 @@ MysqlDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototype. ...@@ -26,7 +26,7 @@ MysqlDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototype.
forShare: 'LOCK IN SHARE MODE', forShare: 'LOCK IN SHARE MODE',
inserts: { inserts: {
ignoreDuplicates: ' IGNORE', ignoreDuplicates: ' IGNORE',
updateOnDuplicate: true updateOnDuplicate: ' ON DUPLICATE KEY UPDATE'
}, },
index: { index: {
collate: false, collate: false,
......
...@@ -42,7 +42,8 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototy ...@@ -42,7 +42,8 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototy
functionBased: true functionBased: true
}, },
inserts: { inserts: {
onConflictDoNothing: ' ON CONFLICT DO NOTHING' onConflictDoNothing: ' ON CONFLICT DO NOTHING',
updateOnDuplicate: ' ON CONFLICT DO UPDATE SET'
}, },
NUMERIC: true, NUMERIC: true,
ARRAY: true, ARRAY: true,
......
...@@ -2517,7 +2517,7 @@ class Model { ...@@ -2517,7 +2517,7 @@ class Model {
* @param {boolean} [options.hooks=true] Run before / after bulk create hooks? * @param {boolean} [options.hooks=true] Run before / after bulk create hooks?
* @param {boolean} [options.individualHooks=false] Run before / after create hooks for each individual Instance? BulkCreate hooks will still be run if options.hooks is true. * @param {boolean} [options.individualHooks=false] Run before / after create hooks for each individual Instance? BulkCreate hooks will still be run if options.hooks is true.
* @param {boolean} [options.ignoreDuplicates=false] Ignore duplicate values for primary keys? (not supported by MSSQL or Postgres < 9.5) * @param {boolean} [options.ignoreDuplicates=false] Ignore duplicate values for primary keys? (not supported by MSSQL or Postgres < 9.5)
* @param {Array} [options.updateOnDuplicate] Fields to update if row key already exists (on duplicate key update)? (only supported by MySQL). By default, all fields are updated. * @param {Array} [options.updateOnDuplicate] Fields to update if row key already exists (on duplicate key update)? (only supported by MySQL, MariaDB & Postgres >= 9.5). By default, all fields are updated.
* @param {Transaction} [options.transaction] Transaction to run query under * @param {Transaction} [options.transaction] Transaction to run query under
* @param {Function} [options.logging=false] A function that gets executed while running the query to log the sql. * @param {Function} [options.logging=false] A function that gets executed while running the query to log the sql.
* @param {boolean} [options.benchmark=false] Pass query execution time in milliseconds as second argument to logging function (options.logging). * @param {boolean} [options.benchmark=false] Pass query execution time in milliseconds as second argument to logging function (options.logging).
...@@ -2545,7 +2545,7 @@ class Model { ...@@ -2545,7 +2545,7 @@ class Model {
if (options.ignoreDuplicates && ['mssql'].includes(dialect)) { if (options.ignoreDuplicates && ['mssql'].includes(dialect)) {
return Promise.reject(new Error(`${dialect} does not support the ignoreDuplicates option.`)); return Promise.reject(new Error(`${dialect} does not support the ignoreDuplicates option.`));
} }
if (options.updateOnDuplicate && (dialect !== 'mysql' && dialect !== 'mariadb')) { if (options.updateOnDuplicate && (dialect !== 'mysql' && dialect !== 'mariadb' && dialect !== 'postgres')) {
return Promise.reject(new Error(`${dialect} does not support the updateOnDuplicate option.`)); return Promise.reject(new Error(`${dialect} does not support the updateOnDuplicate option.`));
} }
...@@ -2644,6 +2644,11 @@ class Model { ...@@ -2644,6 +2644,11 @@ class Model {
// Map updateOnDuplicate attributes to fields // Map updateOnDuplicate attributes to fields
if (options.updateOnDuplicate) { if (options.updateOnDuplicate) {
options.updateOnDuplicate = options.updateOnDuplicate.map(attr => this.rawAttributes[attr].field || attr); options.updateOnDuplicate = options.updateOnDuplicate.map(attr => this.rawAttributes[attr].field || attr);
// Get primary keys for postgres to enable updateOnDuplicate
options.upsertKeys = _.chain(this.primaryKeys).values().map('fieldName').value();
if (Object.keys(this.uniqueKeys).length > 0) {
options.upsertKeys = _.chain(this.uniqueKeys).values().filter(c => c.fields.length === 1).map('column').value();
}
} }
return this.QueryInterface.bulkInsert(this.getTableName(options), records, options, fieldMappedAttributes).then(results => { return this.QueryInterface.bulkInsert(this.getTableName(options), records, options, fieldMappedAttributes).then(results => {
......
...@@ -480,6 +480,32 @@ describe(Support.getTestDialectTeaser('Model'), () => { ...@@ -480,6 +480,32 @@ describe(Support.getTestDialectTeaser('Model'), () => {
}); });
}); });
it('should support the updateOnDuplicate option with primary keys', function() {
const data = [
{ no: 1, name: 'Peter' },
{ no: 2, name: 'Paul' }
];
return this.Student.bulkCreate(data, { fields: ['no', 'name'], updateOnDuplicate: ['name'] }).then(() => {
const new_data = [
{ no: 1, name: 'Peterson' },
{ no: 2, name: 'Paulson' },
{ no: 3, name: 'Michael' }
];
return this.Student.bulkCreate(new_data, { fields: ['no', 'name'], updateOnDuplicate: ['name'] }).then(() => {
return this.Student.findAll({ order: ['no'] }).then(students => {
expect(students.length).to.equal(3);
expect(students[0].name).to.equal('Peterson');
expect(students[0].no).to.equal(1);
expect(students[1].name).to.equal('Paulson');
expect(students[1].no).to.equal(2);
expect(students[2].name).to.equal('Michael');
expect(students[2].no).to.equal(3);
});
});
});
});
it('should reject for non array updateOnDuplicate option', function() { it('should reject for non array updateOnDuplicate option', function() {
const data = [ const data = [
{ uniqueName: 'Peter', secretValue: '42' }, { uniqueName: 'Peter', secretValue: '42' },
......
...@@ -850,6 +850,9 @@ if (dialect.startsWith('postgres')) { ...@@ -850,6 +850,9 @@ if (dialect.startsWith('postgres')) {
}, { }, {
arguments: [{ schema: 'mySchema', tableName: 'myTable' }, [{ name: "foo';DROP TABLE mySchema.myTable;" }, { name: 'bar' }]], arguments: [{ schema: 'mySchema', tableName: 'myTable' }, [{ name: "foo';DROP TABLE mySchema.myTable;" }, { name: 'bar' }]],
expectation: "INSERT INTO \"mySchema\".\"myTable\" (\"name\") VALUES ('foo'';DROP TABLE mySchema.myTable;'),('bar');" expectation: "INSERT INTO \"mySchema\".\"myTable\" (\"name\") VALUES ('foo'';DROP TABLE mySchema.myTable;'),('bar');"
}, {
arguments: [{ schema: 'mySchema', tableName: 'myTable' }, [{ name: 'foo' }, { name: 'bar' }], { updateOnDuplicate: ['name'], upsertKeys: ['name'] }],
expectation: "INSERT INTO \"mySchema\".\"myTable\" (\"name\") VALUES ('foo'),('bar') ON CONFLICT (\"name\") DO UPDATE SET \"name\"=EXCLUDED.\"name\";"
}, },
// Variants when quoteIdentifiers is false // Variants when quoteIdentifiers is false
......
...@@ -129,7 +129,8 @@ describe(Support.getTestDialectTeaser('SQL'), () => { ...@@ -129,7 +129,8 @@ describe(Support.getTestDialectTeaser('SQL'), () => {
const User = Support.sequelize.define('user', { const User = Support.sequelize.define('user', {
username: { username: {
type: DataTypes.STRING, type: DataTypes.STRING,
field: 'user_name' field: 'user_name',
primaryKey: true
}, },
password: { password: {
type: DataTypes.STRING, type: DataTypes.STRING,
...@@ -147,10 +148,13 @@ describe(Support.getTestDialectTeaser('SQL'), () => { ...@@ -147,10 +148,13 @@ describe(Support.getTestDialectTeaser('SQL'), () => {
timestamps: true timestamps: true
}); });
expectsql(sql.bulkInsertQuery(User.tableName, [{ user_name: 'testuser', pass_word: '12345' }], { updateOnDuplicate: ['user_name', 'pass_word', 'updated_at'] }, User.fieldRawAttributesMap), // mapping primary keys to their "field" override values
const primaryKeys = User.primaryKeyAttributes.map(attr => User.rawAttributes[attr].field || attr);
expectsql(sql.bulkInsertQuery(User.tableName, [{ user_name: 'testuser', pass_word: '12345' }], { updateOnDuplicate: ['user_name', 'pass_word', 'updated_at'], upsertKeys: primaryKeys }, User.fieldRawAttributesMap),
{ {
default: 'INSERT INTO `users` (`user_name`,`pass_word`) VALUES (\'testuser\',\'12345\');', default: 'INSERT INTO `users` (`user_name`,`pass_word`) VALUES (\'testuser\',\'12345\');',
postgres: 'INSERT INTO "users" ("user_name","pass_word") VALUES (\'testuser\',\'12345\');', postgres: 'INSERT INTO "users" ("user_name","pass_word") VALUES (\'testuser\',\'12345\') ON CONFLICT ("user_name") DO UPDATE SET "user_name"=EXCLUDED."user_name","pass_word"=EXCLUDED."pass_word","updated_at"=EXCLUDED."updated_at";',
mssql: 'INSERT INTO [users] ([user_name],[pass_word]) VALUES (N\'testuser\',N\'12345\');', mssql: 'INSERT INTO [users] ([user_name],[pass_word]) VALUES (N\'testuser\',N\'12345\');',
mariadb: 'INSERT INTO `users` (`user_name`,`pass_word`) VALUES (\'testuser\',\'12345\') ON DUPLICATE KEY UPDATE `user_name`=VALUES(`user_name`),`pass_word`=VALUES(`pass_word`),`updated_at`=VALUES(`updated_at`);', mariadb: 'INSERT INTO `users` (`user_name`,`pass_word`) VALUES (\'testuser\',\'12345\') ON DUPLICATE KEY UPDATE `user_name`=VALUES(`user_name`),`pass_word`=VALUES(`pass_word`),`updated_at`=VALUES(`updated_at`);',
mysql: 'INSERT INTO `users` (`user_name`,`pass_word`) VALUES (\'testuser\',\'12345\') ON DUPLICATE KEY UPDATE `user_name`=VALUES(`user_name`),`pass_word`=VALUES(`pass_word`),`updated_at`=VALUES(`updated_at`);' mysql: 'INSERT INTO `users` (`user_name`,`pass_word`) VALUES (\'testuser\',\'12345\') ON DUPLICATE KEY UPDATE `user_name`=VALUES(`user_name`),`pass_word`=VALUES(`pass_word`),`updated_at`=VALUES(`updated_at`);'
......
...@@ -735,8 +735,8 @@ export interface BulkCreateOptions extends Logging, Transactionable { ...@@ -735,8 +735,8 @@ export interface BulkCreateOptions extends Logging, Transactionable {
ignoreDuplicates?: boolean; ignoreDuplicates?: boolean;
/** /**
* Fields to update if row key already exists (on duplicate key update)? (only supported by mysql & * Fields to update if row key already exists (on duplicate key update)? (only supported by MySQL,
* mariadb). By default, all fields are updated. * MariaDB & Postgres >= 9.5). By default, all fields are updated.
*/ */
updateOnDuplicate?: string[]; updateOnDuplicate?: string[];
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!