不要怂,就是干,撸起袖子干!

Commit 23370b8c by Sascha Depold

Merge branch 'master' of git://github.com/reedog117/sequelize into reedog117-master

2 parents 1f2ef885 c7a48308
var mariasql
, Pooling = require('generic-pool')
, Query = require("./query")
, Utils = require("../../utils")
, without = function(arr, elem) { return arr.filter(function(e) { return e != elem }) }
try { mariasql = require("mariasql") } catch (err) {
console.log("You need to install mariasql package manually"); }
module.exports = (function() {
var ConnectorManager = function(sequelize, config) {
this.sequelize = sequelize
this.client = null
this.config = config || {}
this.disconnectTimeoutId = null
this.queue = []
this.activeQueue = []
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
this.poolCfg = Utils._.defaults(this.config.pool, {
maxConnections: 10,
minConnections: 0,
maxIdleTime: 1000
});
this.pendingQueries = 0;
this.useReplicaton = !!config.replication;
this.useQueue = config.queue !== undefined ? config.queue : true;
var self = this
if (this.useReplicaton) {
var reads = 0,
writes = 0;
// Init configs with options from config if not present
for (var i in config.replication.read) {
config.replication.read[i] = Utils._.defaults(config.replication.read[i], {
host: this.config.host,
port: this.config.port,
username: this.config.username,
password: this.config.password,
db: this.config.database,
ssl: this.config.ssl
});
}
config.replication.write = Utils._.defaults(config.replication.write, {
host: this.config.host,
port: this.config.port,
username: this.config.username,
password: this.config.password,
db: this.config.database,
ssl: this.config.ssl
});
// I'll make my own pool, with blackjack and hookers!
this.pool = {
release: function (client) {
if (client.queryType == 'read') {
return this.read.release(client);
} else {
return this.write.release(client);
}
},
acquire: function (callback, priority, queryType) {
if (queryType == 'SELECT') {
this.read.acquire(callback, priority);
} else {
this.write.acquire(callback, priority);
}
},
drain: function () {
this.read.drain();
this.write.drain();
},
read: Pooling.Pool({
name: 'sequelize-read',
create: function (done) {
if (reads >= self.config.replication.read.length) reads = 0;
var config = self.config.replication.read[reads++];
connect.call(self, function (err, connection) {
connection.queryType = 'read'
done(null, connection)
}, config);
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
}),
write: Pooling.Pool({
name: 'sequelize-write',
create: function (done) {
connect.call(self, function (err, connection) {
connection.queryType = 'write'
done(null, connection)
}, self.config.replication.write);
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
})
};
} else if (this.poolCfg) {
//the user has requested pooling, so create our connection pool
this.pool = Pooling.Pool({
name: 'sequelize-mariasql',
create: function (done) {
connect.call(self, done)
},
destroy: function(client) {
disconnect.call(self, client)
},
max: self.poolCfg.maxConnections,
min: self.poolCfg.minConnections,
idleTimeoutMillis: self.poolCfg.maxIdleTime
})
}
process.on('exit', function () {
//be nice & close our connections on exit
if (self.pool) {
self.pool.drain()
} else if (self.client) {
disconnect(self.client)
}
return
})
}
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype);
var isConnecting = false;
ConnectorManager.prototype.query = function(sql, callee, options) {
if (!this.isConnected && !this.pool) {
this.connect()
}
if (this.useQueue) {
var queueItem = {
query: new Query(this.client, this.sequelize, callee, options || {}),
sql: sql
};
enqueue.call(this, queueItem, options);
return queueItem.query;
}
var self = this, query = new Query(this.client, this.sequelize, callee, options || {});
this.pendingQueries++;
query.done(function() {
self.pendingQueries--;
if (self.pool) self.pool.release(query.client);
else {
if (self.pendingQueries === 0) {
setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self);
}, 100);
}
}
});
if (!this.pool) {
query.run(sql);
}
else {
this.pool.acquire(function(err, client) {
if (err) return query.emit('error', err);
query.client = client;
query.run(sql);
return;
}, undefined, options.type);
}
return query;
};
ConnectorManager.prototype.connect = function() {
var self = this;
// in case database is slow to connect, prevent orphaning the client
if (this.isConnecting || this.pool) {
return;
}
connect.call(self, function(err, client) {
self.client = client;
return;
});
return;
};
ConnectorManager.prototype.disconnect = function() {
if (this.client) disconnect.call(this, this.client);
return;
};
// private
var disconnect = function(client) {
var self = this;
if (!this.useQueue) {
this.client = null;
}
client.end(function() {
if (!self.useQueue) {
return client.destroy();
}
var intervalObj = null
var cleanup = function () {
var retryCt = 0
// make sure to let client finish before calling destroy
if (self && self.hasQueuedItems) {
return
}
// needed to prevent mariasql connection leak
client.destroy()
if (self && self.client) {
self.client = null
}
clearInterval(intervalObj)
}
intervalObj = setInterval(cleanup, 10)
cleanup()
return
})
}
var connect = function(done, config) {
config = config || this.config
var connection = new mariasql();
this.isConnecting = true
connection.connect({
host: config.host,
port: config.port,
user: config.username,
password: config.password,
db: config.database,
ssl: config.ssl || undefined
// timezone: 'Z' // unsupported by mariasql
})
connection.on('connect', function() {
connection.query("SET time_zone = '+0:00'");
// client.setMaxListeners(self.maxConcurrentQueries)
this.isConnecting = false
done(null, connection)
})
}
var enqueue = function(queueItem, options) {
options = options || {}
if (this.activeQueue.length < this.maxConcurrentQueries) {
this.activeQueue.push(queueItem)
if (this.pool) {
var self = this
this.pool.acquire(function(err, client) {
if (err) {
queueItem.query.emit('error', err)
return
}
//we set the client here, asynchronously, when getting a pooled connection
//allowing the ConnectorManager.query method to remain synchronous
queueItem.query.client = client
queueItem.client = client
execQueueItem.call(self, queueItem)
return
}, undefined, options.type)
} else {
execQueueItem.call(this, queueItem)
}
} else {
this.queue.push(queueItem)
}
}
var dequeue = function(queueItem) {
//return the item's connection to the pool
if (this.pool) {
this.pool.release(queueItem.client)
}
this.activeQueue = without(this.activeQueue, queueItem)
}
var transferQueuedItems = function(count) {
for(var i = 0; i < count; i++) {
var queueItem = this.queue.shift();
if (queueItem) {
enqueue.call(this, queueItem)
}
}
}
var afterQuery = function(queueItem) {
var self = this
dequeue.call(this, queueItem)
transferQueuedItems.call(this, this.maxConcurrentQueries - this.activeQueue.length)
disconnectIfNoConnections.call(this)
}
var execQueueItem = function(queueItem) {
var self = this
queueItem.query
.success(function(){ afterQuery.call(self, queueItem) })
.error(function(){ afterQuery.call(self, queueItem) })
queueItem.query.run(queueItem.sql, queueItem.client)
}
ConnectorManager.prototype.__defineGetter__('hasQueuedItems', function() {
return (this.queue.length > 0) || (this.activeQueue.length > 0) || (this.client && this.client._queries && (this.client._queries.length > 0))
})
// legacy
ConnectorManager.prototype.__defineGetter__('hasNoConnections', function() {
return !this.hasQueuedItems
})
ConnectorManager.prototype.__defineGetter__('isConnected', function() {
return this.client != null
})
var disconnectIfNoConnections = function() {
var self = this
this.disconnectTimeoutId && clearTimeout(this.disconnectTimeoutId)
this.disconnectTimeoutId = setTimeout(function() {
self.isConnected && !self.hasQueuedItems && self.disconnect()
}, 100)
}
return ConnectorManager
})()
var Utils = require("../../utils")
, DataTypes = require("../../data-types")
, util = require("util")
module.exports = (function() {
var QueryGenerator = {
createTableQuery: function(tableName, attributes, options) {
options = Utils._.extend({
engine: 'InnoDB',
charset: null
}, options || {})
var query = "CREATE TABLE IF NOT EXISTS <%= table %> (<%= attributes%>) ENGINE=<%= engine %> <%= charset %>"
, primaryKeys = []
, attrStr = []
for (var attr in attributes) {
if (attributes.hasOwnProperty(attr)) {
var dataType = attributes[attr]
if (Utils._.includes(dataType, 'PRIMARY KEY')) {
primaryKeys.push(attr)
attrStr.push(QueryGenerator.addQuotes(attr) + " " + dataType.replace(/PRIMARY KEY/, ''))
} else {
attrStr.push(QueryGenerator.addQuotes(attr) + " " + dataType)
}
}
}
var values = {
table: QueryGenerator.addQuotes(tableName),
attributes: attrStr.join(", "),
engine: options.engine,
charset: (options.charset ? "DEFAULT CHARSET=" + options.charset : "")
}
, pkString = primaryKeys.map(function(pk) { return QueryGenerator.addQuotes(pk) }).join(", ")
if (pkString.length > 0) {
values.attributes += ", PRIMARY KEY (" + pkString + ")"
}
return Utils._.template(query)(values).trim() + ";"
},
dropTableQuery: function(tableName, options) {
options = options || {}
var query = "DROP TABLE IF EXISTS <%= table %>;"
return Utils._.template(query)({
table: QueryGenerator.addQuotes(tableName)
})
},
renameTableQuery: function(before, after) {
var query = "RENAME TABLE `<%= before %>` TO `<%= after %>`;"
return Utils._.template(query)({ before: before, after: after })
},
showTablesQuery: function() {
return 'SHOW TABLES;'
},
addColumnQuery: function(tableName, attributes) {
var query = "ALTER TABLE `<%= tableName %>` ADD <%= attributes %>;"
, attrString = []
for (var attrName in attributes) {
var definition = attributes[attrName]
attrString.push(Utils._.template('`<%= attrName %>` <%= definition %>')({
attrName: attrName,
definition: definition
}))
}
return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') })
},
removeColumnQuery: function(tableName, attributeName) {
var query = "ALTER TABLE `<%= tableName %>` DROP `<%= attributeName %>`;"
return Utils._.template(query)({ tableName: tableName, attributeName: attributeName })
},
changeColumnQuery: function(tableName, attributes) {
var query = "ALTER TABLE `<%= tableName %>` CHANGE <%= attributes %>;"
var attrString = []
for (attrName in attributes) {
var definition = attributes[attrName]
attrString.push(Utils._.template('`<%= attrName %>` `<%= attrName %>` <%= definition %>')({
attrName: attrName,
definition: definition
}))
}
return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') })
},
renameColumnQuery: function(tableName, attrBefore, attributes) {
var query = "ALTER TABLE `<%= tableName %>` CHANGE <%= attributes %>;"
var attrString = []
for (var attrName in attributes) {
var definition = attributes[attrName]
attrString.push(Utils._.template('`<%= before %>` `<%= after %>` <%= definition %>')({
before: attrBefore,
after: attrName,
definition: definition
}))
}
return Utils._.template(query)({ tableName: tableName, attributes: attrString.join(', ') })
},
selectQuery: function(tableName, options) {
var query = "SELECT <%= attributes %> FROM <%= table %>"
, table = null
options = options || {}
options.table = table = Array.isArray(tableName) ? tableName.map(function(tbl){ return QueryGenerator.addQuotes(tbl) }).join(", ") : QueryGenerator.addQuotes(tableName)
options.attributes = options.attributes && options.attributes.map(function(attr){
if(Array.isArray(attr) && attr.length == 2) {
return [attr[0], QueryGenerator.addQuotes(attr[1])].join(' as ')
} else {
return attr.indexOf(Utils.TICK_CHAR) < 0 ? QueryGenerator.addQuotes(attr) : attr
}
}).join(", ")
options.attributes = options.attributes || '*'
if (options.include) {
var optAttributes = [options.table + '.*']
options.include.forEach(function(include) {
var attributes = Object.keys(include.daoFactory.attributes).map(function(attr) {
var template = Utils._.template("`<%= as %>`.`<%= attr %>` AS `<%= as %>.<%= attr %>`")
return template({ as: include.as, attr: attr })
})
optAttributes = optAttributes.concat(attributes)
var joinQuery = " LEFT OUTER JOIN `<%= table %>` AS `<%= as %>` ON `<%= tableLeft %>`.`<%= attrLeft %>` = `<%= tableRight %>`.`<%= attrRight %>`"
query += Utils._.template(joinQuery)({
table: include.daoFactory.tableName,
as: include.as,
tableLeft: ((include.association.associationType === 'BelongsTo') ? include.as : tableName),
attrLeft: 'id',
tableRight: ((include.association.associationType === 'BelongsTo') ? tableName : include.as),
attrRight: include.association.identifier
})
})
options.attributes = optAttributes.join(', ')
}
if (options.where) {
options.where = this.getWhereConditions(options.where, tableName)
query += " WHERE <%= where %>"
}
if (options.group) {
options.group = Array.isArray(options.group) ? options.group.map(function(grp){return QueryGenerator.addQuotes(grp)}).join(', ') : QueryGenerator.addQuotes(options.group)
query += " GROUP BY <%= group %>"
}
if (options.order) {
query += " ORDER BY <%= order %>"
}
if (options.limit && !(options.include && (options.limit === 1))) {
if (options.offset) {
query += " LIMIT <%= offset %>, <%= limit %>"
} else {
query += " LIMIT <%= limit %>"
}
}
query += ";"
return Utils._.template(query)(options)
},
insertQuery: function(tableName, attrValueHash) {
attrValueHash = Utils.removeNullValuesFromHash(attrValueHash, this.options.omitNull)
var query = "INSERT INTO <%= table %> (<%= attributes %>) VALUES (<%= values %>);"
var replacements = {
table: QueryGenerator.addQuotes(tableName),
attributes: Object.keys(attrValueHash).map(function(attr){return QueryGenerator.addQuotes(attr)}).join(","),
values: Utils._.values(attrValueHash).map(function(value){
return Utils.escape((value instanceof Date) ? Utils.toSqlDate(value) : value)
}).join(",")
}
return Utils._.template(query)(replacements)
},
updateQuery: function(tableName, attrValueHash, where) {
attrValueHash = Utils.removeNullValuesFromHash(attrValueHash, this.options.omitNull)
var query = "UPDATE <%= table %> SET <%= values %> WHERE <%= where %>;"
, values = []
for (var key in attrValueHash) {
var value = attrValueHash[key]
, _value = (value instanceof Date) ? Utils.toSqlDate(value) : value
values.push(QueryGenerator.addQuotes(key) + "=" + Utils.escape(_value))
}
var replacements = {
table: QueryGenerator.addQuotes(tableName),
values: values.join(","),
where: QueryGenerator.getWhereConditions(where)
}
return Utils._.template(query)(replacements)
},
deleteQuery: function(tableName, where, options) {
options = options || {}
options.limit = options.limit || 1
var query = "DELETE FROM <%= table %> WHERE <%= where %> LIMIT <%= limit %>;"
var replacements = {
table: QueryGenerator.addQuotes(tableName),
where: QueryGenerator.getWhereConditions(where),
limit: Utils.escape(options.limit)
}
return Utils._.template(query)(replacements)
},
incrementQuery: function (tableName, attrValueHash, where) {
attrValueHash = Utils.removeNullValuesFromHash(attrValueHash, this.options.omitNull)
var query = "UPDATE <%= table %> SET <%= values %> WHERE <%= where %> ;"
, values = []
for (var key in attrValueHash) {
var value = attrValueHash[key]
, _value = (value instanceof Date) ? Utils.toSqlDate(value) : value
values.push(QueryGenerator.addQuotes(key) + "=" + QueryGenerator.addQuotes(key) + " + " +Utils.escape(_value))
}
var replacements = {
table: QueryGenerator.addQuotes(tableName),
values: values.join(","),
where: QueryGenerator.getWhereConditions(where)
}
return Utils._.template(query)(replacements)
},
addIndexQuery: function(tableName, attributes, options) {
var transformedAttributes = attributes.map(function(attribute) {
if(typeof attribute === 'string') {
return attribute
} else {
var result = ""
if (!attribute.attribute) {
throw new Error('The following index attribute has no attribute: ' + util.inspect(attribute))
}
result += attribute.attribute
if (attribute.length) {
result += '(' + attribute.length + ')'
}
if (attribute.order) {
result += ' ' + attribute.order
}
return result
}
})
var onlyAttributeNames = attributes.map(function(attribute) {
return (typeof attribute === 'string') ? attribute : attribute.attribute
})
options = Utils._.extend({
indicesType: null,
indexName: Utils._.underscored(tableName + '_' + onlyAttributeNames.join('_')),
parser: null
}, options || {})
return Utils._.compact([
"CREATE", options.indicesType, "INDEX", options.indexName,
(options.indexType ? ('USING ' + options.indexType) : undefined),
"ON", tableName, '(' + transformedAttributes.join(', ') + ')',
(options.parser ? "WITH PARSER " + options.parser : undefined)
]).join(' ')
},
showIndexQuery: function(tableName, options) {
var sql = "SHOW INDEX FROM <%= tableName %><%= options %>"
return Utils._.template(sql)({
tableName: tableName,
options: (options || {}).database ? ' FROM ' + options.database : ''
})
},
removeIndexQuery: function(tableName, indexNameOrAttributes) {
var sql = "DROP INDEX <%= indexName %> ON <%= tableName %>"
, indexName = indexNameOrAttributes
if (typeof indexName !== 'string') {
indexName = Utils._.underscored(tableName + '_' + indexNameOrAttributes.join('_'))
}
return Utils._.template(sql)({ tableName: tableName, indexName: indexName })
},
getWhereConditions: function(smth, tableName) {
var result = null
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}
if (Utils.isHash(smth)) {
smth = Utils.prependTableNameToHash(tableName, smth)
result = this.hashToWhereConditions(smth)
} else if (typeof smth === 'number') {
smth = Utils.prependTableNameToHash(tableName, { 'id': smth })
result = this.hashToWhereConditions(smth)
} else if (typeof smth === "string") {
if (!isNumeric(smth)) {
result = smth
} else {
smth = Utils.prependTableNameToHash(tableName, { 'id': smth })
result = this.hashToWhereConditions(smth)
}
} else if (Array.isArray(smth)) {
result = Utils.format(smth)
}
return result
},
hashToWhereConditions: function(hash) {
var result = []
for (var key in hash) {
var value = hash[key]
//handle qualified key names
var _key = key.split('.').map(function(col){return QueryGenerator.addQuotes(col)}).join(".")
, _value = null
if (Array.isArray(value)) {
// is value an array?
if (value.length == 0) { value = [null] }
_value = "(" + value.map(function(subValue) {
return Utils.escape(subValue);
}).join(',') + ")"
result.push([_key, _value].join(" IN "))
} else if ((value) && (typeof value == 'object')) {
// is value an object?
//using as sentinel for join column => value
_value = value.join.split('.').map(function(col){ return QueryGenerator.addQuotes(col) }).join(".")
result.push([_key, _value].join("="))
} else {
_value = Utils.escape(value)
result.push((_value == 'NULL') ? _key + " IS NULL" : [_key, _value].join("="))
}
}
return result.join(" AND ")
},
attributesToSQL: function(attributes) {
var result = {}
for (var name in attributes) {
var dataType = attributes[name]
if (Utils.isHash(dataType)) {
var template = "<%= type %>"
, replacements = { type: dataType.type }
if (dataType.type.toString() === DataTypes.ENUM.toString()) {
if (Array.isArray(dataType.values) && (dataType.values.length > 0)) {
replacements.type = "ENUM(" + Utils._.map(dataType.values, function(value) {
return Utils.escape(value)
}).join(", ") + ")"
} else {
throw new Error('Values for ENUM haven\'t been defined.')
}
}
if (dataType.hasOwnProperty('allowNull') && (!dataType.allowNull)) {
template += " NOT NULL"
}
if (dataType.autoIncrement) {
template += " auto_increment"
}
if ((dataType.defaultValue != undefined) && (dataType.defaultValue != DataTypes.NOW)) {
template += " DEFAULT <%= defaultValue %>"
replacements.defaultValue = Utils.escape(dataType.defaultValue)
}
if (dataType.unique) {
template += " UNIQUE"
}
if (dataType.primaryKey) {
template += " PRIMARY KEY"
}
result[name] = Utils._.template(template)(replacements)
} else {
result[name] = dataType
}
}
return result
},
findAutoIncrementField: function(factory) {
var fields = []
for (var name in factory.attributes) {
if (factory.attributes.hasOwnProperty(name)) {
var definition = factory.attributes[name]
if (definition && (definition.indexOf('auto_increment') > -1)) {
fields.push(name)
}
}
}
return fields
},
addQuotes: function(s, quoteChar) {
return Utils.addTicks(s, quoteChar)
},
removeQuotes: function(s, quoteChar) {
return Utils.removeTicks(s, quoteChar)
}
}
return Utils._.extend(Utils._.clone(require("../query-generator")), QueryGenerator)
})()
var Utils = require("../../utils")
, AbstractQuery = require('../abstract/query')
module.exports = (function() {
var Query = function(client, sequelize, callee, options) {
this.client = client
this.callee = callee
this.sequelize = sequelize
this.options = Utils._.extend({
logging: console.log,
plain: false,
raw: false
}, options || {})
this.checkLoggingOption()
}
Utils.inherit(Query, AbstractQuery)
Query.prototype.run = function(sql) {
this.sql = sql
if (this.options.logging !== false) {
this.options.logging('Executing: ' + this.sql)
}
var resultSet = [];
this.client.query(this.sql)
.on('result', function(results) {
results.on('row', function(row) {
resultSet.push(row);
})
.on('error', function(err) {
this.emit('error', err, this.callee)
})
.on('end', function(info) {
//console.log(info)
});
})
.on('error', function(err) {
console.log( stack )
//this.emit('error', err, this.callee)
})
.on('end', function() {
this.emit('sql', this.sql)
this.emit('success', this.formatResults(resultSet))
}.bind(this))
return this
}
return Query
})()
...@@ -77,6 +77,7 @@ module.exports = (function() { ...@@ -77,6 +77,7 @@ module.exports = (function() {
queue: true, queue: true,
native: false, native: false,
replication: false, replication: false,
ssl: undefined,
pool: {} pool: {}
}, options || {}) }, options || {})
...@@ -95,6 +96,7 @@ module.exports = (function() { ...@@ -95,6 +96,7 @@ module.exports = (function() {
protocol: this.options.protocol, protocol: this.options.protocol,
queue : this.options.queue, queue : this.options.queue,
native : this.options.native, native : this.options.native,
ssl : this.options.ssl,
replication: this.options.replication, replication: this.options.replication,
maxConcurrentQueries: this.options.maxConcurrentQueries maxConcurrentQueries: this.options.maxConcurrentQueries
} }
......
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
"sqlite3": "~2.1.5", "sqlite3": "~2.1.5",
"mysql": "~2.0.0-alpha7", "mysql": "~2.0.0-alpha7",
"pg": "~0.10.2", "pg": "~0.10.2",
"mariasql": "~0.1.18",
"buster": "~0.6.0", "buster": "~0.6.0",
"watchr": "~2.2.0", "watchr": "~2.2.0",
"yuidocjs": "~0.3.36" "yuidocjs": "~0.3.36"
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!