不要怂,就是干,撸起袖子干!

Commit fd0d5059 by Matt Broadstone

refactor bulk insert and clean up logs

Bulk insertions were refactored to prefer row construction for bulk
inserts that are not empty. The actual sql generation was moved back
into the query-generator, and removed from sql-generator. Also a number
of console.log and console.warn's were moved over to using sequelize's
internal logging mechanism
1 parent 0968f43b
......@@ -7,8 +7,6 @@ var Utils = require('../../utils')
, _ = require('lodash')
, util = require('util');
module.exports = (function() {
var QueryGenerator = {
get options(){
......@@ -156,60 +154,64 @@ module.exports = (function() {
Returns an insert into command for multiple values.
Parameters: table name + list of hashes of attribute-value-pairs.
*/
/* istanbul ignore next */
bulkInsertQuery: function(tableName, attrValueHashes,options, attributes) {
var query = '',
allAttributes = [],
insertKey = false,
isEmpty = true,
ignoreKeys = [];
bulkInsertQuery: function(tableName, attrValueHashes, options, attributes) {
var query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;'
, emptyQuery = 'INSERT INTO <%= table %> DEFAULT VALUES'
, tuples = []
, allAttributes = []
, needIdentityInsertWrapper = false
, allQueries = [];
for(var key in attributes){
var aliasKey = attributes[key].field || key;
if(ignoreKeys.indexOf(aliasKey) < 0){
ignoreKeys.push(aliasKey);
}
if(attributes[key].autoIncrement){
for(var i = 0; i < attrValueHashes.length; i++){
if(aliasKey in attrValueHashes[i]){
delete attrValueHashes[i][aliasKey];
}
}
}
}
Utils._.forEach(attrValueHashes, function(attrValueHash, i) {
Utils._.forOwn(attrValueHash, function(value, key, hash) {
if (allAttributes.indexOf(key) === -1) allAttributes.push(key);
var aliasKey = attributes[key].field || key;
if (value !== null && ignoreKeys.indexOf(key) > -1){
// special case for empty objects with primary keys
var fields = Object.keys(attrValueHash);
if (fields.length === 1 && attributes[fields[0]].autoIncrement && attrValueHash[fields[0]] === null) {
allQueries.push(emptyQuery);
return;
}
ignoreKeys.splice(ignoreKeys.indexOf(key),1);
}else if(value !== null && attributes[key].autoIncrement){
insertKey = true;
// normal case
Utils._.forOwn(attrValueHash, function(value, key, hash) {
if (value !== null && attributes[key].autoIncrement) {
needIdentityInsertWrapper = true;
}
if(value !== null){
isEmpty = false;
if (allAttributes.indexOf(key) === -1) {
if (value === null && attributes[key].autoIncrement)
return;
allAttributes.push(key);
}
});
});
if(!isEmpty){
for(var j = 0; j < ignoreKeys.length; j++){
if(allAttributes.indexOf(ignoreKeys[j]) > -1){
allAttributes.splice(allAttributes.indexOf(ignoreKeys[j]), 1);
}
}
query = SqlGenerator.bulkInsertSql(tableName, allAttributes, attrValueHashes,options);
if(insertKey){
query = SqlGenerator.identityInsertWrapper(query, tableName);
}
}else{
for(var k = 0; k < attrValueHashes.length; k++){
query += SqlGenerator.insertSql(tableName);
}
if (allAttributes.length > 0) {
Utils._.forEach(attrValueHashes, function(attrValueHash, i) {
tuples.push('(' +
allAttributes.map(function(key) {
return this.escape(attrValueHash[key]);
}.bind(this)).join(',') +
')');
}.bind(this));
allQueries.push(query);
}
return query;
var replacements = {
ignoreDuplicates: options && options.ignoreDuplicates ? ' IGNORE' : '',
table: this.quoteTable(tableName),
attributes: allAttributes.map(function(attr) {
return this.quoteIdentifier(attr);
}.bind(this)).join(','),
tuples: tuples
};
var generatedQuery = Utils._.template(allQueries.join(';'))(replacements);
if (needIdentityInsertWrapper)
return SqlGenerator.identityInsertWrapper(generatedQuery, tableName);
return generatedQuery;
},
/*
Returns an update query.
Parameters:
......@@ -226,7 +228,7 @@ module.exports = (function() {
//very unique case for cascades, i generally don't approve
if(Object.keys(attrValueHash).length === 1 && attributes[Object.keys(attrValueHash)[0]].primaryKey){
console.warn('Updating a Primary Key is not supported in MSSQL, please restructure your query');
this.sequelize.log('Updating a Primary Key is not supported in MSSQL, please restructure your query');
}else{
for(var key in attributes){
var aliasKey = attributes[key].field || key;
......@@ -250,6 +252,7 @@ module.exports = (function() {
return query;
},
/*
Returns a deletion query.
Parameters:
......@@ -351,7 +354,7 @@ module.exports = (function() {
if(attribute.onDelete || attribute.onUpdate){
//handles self referencial keys, first it doesnt make sense, second, what?
if(attribute.Model && attribute.Model.tableName === attribute.references){
console.warn('MSSQL does not support self referencial constraints, '
this.sequelize.log('MSSQL does not support self referencial constraints, '
+ 'we will remove it but we recommend restructuring your query');
attribute.onDelete = '';
attribute.onUpdate = '';
......@@ -365,7 +368,7 @@ module.exports = (function() {
for(var j = i+1; j < cascadeCheck.length; j++){
var casKey2 = cascadeCheck[j];
if(attributes[casKey].referencesKey === attributes[casKey2].referencesKey){
console.warn('MSSQL does not support multiple cascade keys on the same reference, '
this.sequelize.log('MSSQL does not support multiple cascade keys on the same reference, '
+ 'we will remove them to make this work but we recommend restructuring your query.');
attributes[casKey].onDelete = '';
attributes[casKey].onUpdate = '';
......@@ -402,6 +405,7 @@ module.exports = (function() {
}
return fields;
},
quoteTable: function(param, as) {
return SqlGenerator.quoteTable(param, as);
},
......
......@@ -44,7 +44,7 @@ module.exports = (function() {
var trans = new self.connection.lib.Transaction(self.connection.context);
trans.begin(function(err){
if (err) {
console.log(err.message);
this.sequelize.log(err.message);
reject(self.formatError(err));
}else{
self.connection.lib._transaction = trans;
......@@ -53,7 +53,7 @@ module.exports = (function() {
});
} else{
var request, transCommand;
if (self.connection.lib._transaction
if (self.connection.lib._transaction
&& self.connection.uuid){
request = new self.connection.lib.Request(self.connection.lib._transaction);
......@@ -68,7 +68,7 @@ module.exports = (function() {
self.sql === 'ROLLBACK TRANSACTION;') {
self.connection.lib._transaction[transCommand](function (err, result) {
if (err) {
console.log(err.message);
self.sequelize.log(err.message);
reject(self.formatError(err));
} else {
resolve(self.formatResults(result));
......@@ -85,7 +85,7 @@ module.exports = (function() {
promise.emit('sql', self.sql, self.connection.uuid);
}
if(err){
console.log(err.message);
self.sequelize.log(err.message);
reject(self.formatError(err));
} else {
resolve(self.formatResults(recordset));
......@@ -141,7 +141,7 @@ module.exports = (function() {
} else if (this.isCallQuery()) {
result = data[0];
} else if (this.isBulkUpdateQuery()) {
result = data.length;
result = data.length;
} else if (this.isBulkDeleteQuery()){
result = data[0].AFFECTEDROWS;
} else if (result && result.dataValues){}
......
......@@ -25,6 +25,7 @@ var attributeMap = {
onUpdate:"ON UPDATE",
default:"DEFAULT"
};
function escape(value, field) {
if (value && value._isSequelizeMethod) {
return value.toString();
......@@ -32,10 +33,12 @@ function escape(value, field) {
return SqlString.escape(value, false, _options.timezone, _dialect, field);
}
}
function quoteIdentifier(identifier, force) {
if (identifier === '*') return identifier;
return Utils.addTicks(identifier, '"');
}
/*
Split an identifier into .-separated tokens and quote each part
*/
......@@ -48,9 +51,11 @@ function quoteIdentifiers(identifiers, force) {
return quoteIdentifier(identifiers);
}
}
function wrapSingleQuote(identifier){
return Utils.addTicks(identifier, "'");
}
function nameIndexes(indexes, rawTablename) {
return Utils._.map(indexes, function (index) {
if (!index.hasOwnProperty('name')) {
......@@ -120,6 +125,7 @@ function processValue(val, modelAttribute){
return escape(val, modelAttribute);
}
}
function valuesToSql(fields, modelAttributeMap, isUpdate){
var values = [];
for (var key in fields) {
......@@ -138,6 +144,7 @@ function valuesToSql(fields, modelAttributeMap, isUpdate){
}
return '';
}
function loadColumn(attributes){
var attrStr = [];
for (var attr in attributes) {
......@@ -167,6 +174,7 @@ function loadFields(attributes){
}
return attrStr;
}
function processField(attribute, tableName){
var sql = '';
if(tableName){
......@@ -178,13 +186,14 @@ function processField(attribute, tableName){
return sql + quoteIdentifier(attribute[0]);
}
}
function loadFieldsWithName(attributes, tableName){
var attrStr = [];
if(Array.isArray(attributes[0])){
for (var i = 0; i < attributes.length; i++) {
attrStr.push(processField(attributes[i], tableName));
}
}else{
}else{
for (var attr in attributes) {
if(tableName){
attrStr.push(quoteIdentifier(tableName) + "." + quoteIdentifier(attr));
......@@ -195,6 +204,7 @@ function loadFieldsWithName(attributes, tableName){
}
return attrStr.join(',');
}
function joinFields(attributes, tableName){
var attrStr = [];
if(tableName){
......@@ -208,41 +218,51 @@ function joinFields(attributes, tableName){
return attrStr.join(',');
}
module.exports = {
get options(){
return _options;
},
set options(opt) {
_options = opt;
},
get dialect(){
return _dialect;
},
set dialect(dial) {
_dialect = dial;
},
get sequelize(){
return _sequelize;
},
set sequelize(seq) {
_sequelize = seq;
},
quoteIdentifier: function(val){
return quoteIdentifier(val);
},
quoteIdentifiers: function(val, force){
return quoteIdentifiers(val, force);
},
escape: function(value, field) {
return escape(value,field);
},
showTableSql: function(){
return 'SELECT name FROM sys.Tables;';
},
processValue: function(val, modelAttribute){
return processValue(val, modelAttribute);
},
quoteTable: function(param, as) {
var table = '';
if (as === true) {
......@@ -264,6 +284,7 @@ module.exports = {
}
return table;
},
identityInsertWrapper: function(query, table){
return[
'SET IDENTITY_INSERT', quoteIdentifier(table), 'ON;',
......@@ -271,6 +292,7 @@ module.exports = {
'SET IDENTITY_INSERT', quoteIdentifier(table), 'OFF;',
].join(' ');
},
getCreateSchemaSql: function(schema){
return [
'IF NOT EXISTS (SELECT schema_name',
......@@ -280,6 +302,7 @@ module.exports = {
'EXEC sp_executesql N\'CREATE SCHEMA', quoteIdentifier(schema),';\'',
"END;"].join(' ');
},
getCreateTableSql: function(tableName, attributes, options) {
var query = "CREATE TABLE <%= tableName %> (<%= attributes%>)";
var attrStr = []
......@@ -300,6 +323,7 @@ module.exports = {
query = addTableExistsWrapper(query);
return Utils._.template(query)(values).trim() + ";";
},
alterTableSql: function(tableName){
var query = 'ALTER TABLE <%= tableName %>';
var value = {
......@@ -307,44 +331,19 @@ module.exports = {
};
return Utils._.template(query)(value);
},
dropTableSql: function(tableName, options){
var query = "DROP TABLE <%= tableName %>";
var values ={};
values = {
unquotedTable: tableName,
tableName: quoteIdentifier(tableName.toString())
};
};
query = addTableExistsWrapper(query, true);
return Utils._.template(query)(values).trim() + ";";
},
bulkInsertSql: function(tableName, attributeKeys, attributes,options) {
var query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %>;'
, emptyQuery = 'INSERT INTO <%= table %> DEFAULT VALUES'
, tuples = [];
Utils._.forEach(attributes, function(attrValueHash, i) {
tuples.push('(' +
attributeKeys.map(function(key) {
if(typeof attrValueHash[key] === 'boolean'){
return "'" + attrValueHash[key] + "'";
}
return escape(attrValueHash[key]);
}.bind(this)).join(',') +
')');
}.bind(this));
var replacements = {
ignoreDuplicates: options && options.ignoreDuplicates ? ' IGNORE' : '',
table: quoteIdentifier(tableName.toString()),
attributes: attributeKeys.map(function(attr) {
return quoteIdentifier(attr);
}.bind(this)).join(','),
tuples: tuples
};
return Utils._.template(query)(replacements);
},
insertSql: function(tableName, valueHash, modelAttributeMap) {
var query
, valueQuery = 'INSERT INTO <%= tableName %> (<%= attributes %>)'
......@@ -366,7 +365,7 @@ module.exports = {
&& (valueHash[key] === 'DEFAULT'
|| valueHash[key] === null)){
delete valueHash[key];
}else if(valueHash[key]
}else if(valueHash[key]
&& modelAttributeMap[key].autoIncrement){
insertKey = true;
}
......@@ -386,6 +385,7 @@ module.exports = {
}
return Utils._.template(query)(replacements);
},
updateSql: function(tableName, valueHash, where, options, attributes){
options = options || {};
......@@ -411,6 +411,7 @@ module.exports = {
return Utils._.template(query)(replacements);
},
incrementSql: function(tableName, attrValueHash, options) {
attrValueHash = Utils.removeNullValuesFromHash(attrValueHash, this.options.omitNull);
......@@ -424,7 +425,7 @@ module.exports = {
for (key in attrValueHash) {
value = attrValueHash[key];
values.push(quoteIdentifier(key) + '=' + quoteIdentifier(key) + ' + ' + escape(value));
values.push(quoteIdentifier(key) + '=' + quoteIdentifier(key) + ' + ' + escape(value));
selFields.push('INSERTED.' + quoteIdentifier(key));
}
......@@ -442,6 +443,7 @@ module.exports = {
return Utils._.template(query)(replacements);
},
deleteSql: function(tableName) {
var query = "DELETE FROM <%= table %>";
var replacements = {
......@@ -459,10 +461,12 @@ module.exports = {
})
});
return 'ADD ' + attribute;
},
},
alterColumnSql: function(){
return 'ALTER COLUMN';
},
renameColumnSql: function(tableName, attrBefore, newColumnName){
var query = 'EXEC SP_RENAME \'<%= tableName %>.<%= before %>\', \'<%= after %>\';';
var attrString = [];
......@@ -497,6 +501,7 @@ module.exports = {
options: (options || {}).database ? ' FROM \'' + options.database + '\'' : ''
});
},
addIndexSql: function(tableName, attributes, options, rawTablename){
if (!options.name) {
// Mostly for cases where addIndex is called directly by the user without an options object (for example in migrations)
......@@ -521,6 +526,7 @@ module.exports = {
'(' + attrStr.join(', ') + ')'
]).join(' ');
},
removeIndexSql: function(tableName, indexNameOrAttributes){
var sql = 'DROP INDEX <%= indexName %> ON <%= tableName %>'
, indexName = indexNameOrAttributes;
......@@ -600,8 +606,6 @@ module.exports = {
template.push(attributeMap.unique);
}
if (attribute.references) {
template.push(attributeMap.references);
template.push(this.quoteTable(attribute.references));
......@@ -656,12 +660,14 @@ module.exports = {
"AND C.TABLE_NAME = ", wrapSingleQuote(tableName)
].join(" ");
},
dropSql: function(val){
return [
'DROP',
quoteIdentifier(val)
].join(' ');
},
alterAttributesSql: function(attributes){
var attrString = [];
for (var attrName in attributes) {
......@@ -674,9 +680,11 @@ module.exports = {
}
return attrString.join(', ');
},
getTopClause: function(limit){
return "TOP(" + limit + ")";
},
getCountClause: function(alias, columnName){
return [
"SELECT COUNT(",
......@@ -684,6 +692,7 @@ module.exports = {
") AS", quoteIdentifier(alias)
].join(' ');
},
getSelectorClause: function(model, options){
var query = ['SELECT'];
//we have joins
......@@ -705,6 +714,7 @@ module.exports = {
}
return query.join(' ');
},
getFromClause: function(tableName, asValue){
var query = ["FROM",
quoteIdentifier(tableName)];
......@@ -714,6 +724,7 @@ module.exports = {
}
return query.join(' ');
},
getJoinClause: function(model, include){
var query = [];
var primaryKey = quoteIdentifier(model.primaryKeyAttribute);
......@@ -774,6 +785,7 @@ module.exports = {
}
return query.join(' ');
},
formatWhereCondition: function(where, tableName){
var query = [];
for(var key in where){
......@@ -799,22 +811,10 @@ module.exports = {
}
return query.join(' ');
},
getWhereClause: function(where, tableName){
return [
'WHERE',
this.formatWhereCondition(where, tableName)
].join(' ');
}};
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!