不要怂,就是干,撸起袖子干!

Commit c1e7317a by Andy Edwards Committed by GitHub

refactor(lib): asyncify remaining methods (#12173)

1 parent e7d5f0ac
......@@ -415,7 +415,7 @@ class BelongsToMany extends Association {
*
* @returns {Promise<Array<Model>>}
*/
get(instance, options) {
async get(instance, options) {
options = Utils.cloneDeep(options) || {};
const through = this.through;
......@@ -483,7 +483,7 @@ class BelongsToMany extends Association {
*
* @returns {Promise<number>}
*/
count(instance, options) {
async count(instance, options) {
const sequelize = this.target.sequelize;
options = Utils.cloneDeep(options);
......@@ -494,7 +494,9 @@ class BelongsToMany extends Association {
options.raw = true;
options.plain = true;
return this.get(instance, options).then(result => parseInt(result.count, 10));
const result = await this.get(instance, options);
return parseInt(result.count, 10);
}
/**
......@@ -506,7 +508,7 @@ class BelongsToMany extends Association {
*
* @returns {Promise<boolean>}
*/
has(sourceInstance, instances, options) {
async has(sourceInstance, instances, options) {
if (!Array.isArray(instances)) {
instances = [instances];
}
......@@ -535,10 +537,10 @@ class BelongsToMany extends Association {
]
};
return this.get(sourceInstance, options).then(associatedObjects =>
_.differenceWith(instancePrimaryKeys, associatedObjects,
(a, b) => _.isEqual(a[this.targetKey], b[this.targetKey])).length === 0
);
const associatedObjects = await this.get(sourceInstance, options);
return _.differenceWith(instancePrimaryKeys, associatedObjects,
(a, b) => _.isEqual(a[this.targetKey], b[this.targetKey])).length === 0;
}
/**
......@@ -553,7 +555,7 @@ class BelongsToMany extends Association {
*
* @returns {Promise}
*/
set(sourceInstance, newAssociatedObjects, options) {
async set(sourceInstance, newAssociatedObjects, options) {
options = options || {};
const sourceKey = this.sourceKey;
......@@ -640,12 +642,13 @@ class BelongsToMany extends Association {
return Promise.all(promises);
};
return this.through.model.findAll(_.defaults({ where, raw: true }, options))
.then(currentRows => updateAssociations(currentRows))
.catch(error => {
if (error instanceof EmptyResultError) return updateAssociations([]);
throw error;
});
try {
const currentRows = await this.through.model.findAll(_.defaults({ where, raw: true }, options));
return await updateAssociations(currentRows);
} catch (error) {
if (error instanceof EmptyResultError) return updateAssociations([]);
throw error;
}
}
/**
......@@ -660,7 +663,7 @@ class BelongsToMany extends Association {
*
* @returns {Promise}
*/
add(sourceInstance, newInstances, options) {
async add(sourceInstance, newInstances, options) {
// If newInstances is null or undefined, no-op
if (!newInstances) return Promise.resolve();
......@@ -734,13 +737,14 @@ class BelongsToMany extends Association {
return Promise.all(promises);
};
return association.through.model.findAll(_.defaults({ where, raw: true }, options))
.then(currentRows => updateAssociations(currentRows))
.then(([associations]) => associations)
.catch(error => {
if (error instanceof EmptyResultError) return updateAssociations();
throw error;
});
try {
const currentRows = await association.through.model.findAll(_.defaults({ where, raw: true }, options));
const [associations] = await updateAssociations(currentRows);
return associations;
} catch (error) {
if (error instanceof EmptyResultError) return updateAssociations();
throw error;
}
}
/**
......@@ -777,7 +781,7 @@ class BelongsToMany extends Association {
*
* @returns {Promise}
*/
create(sourceInstance, values, options) {
async create(sourceInstance, values, options) {
const association = this;
options = options || {};
......@@ -797,9 +801,10 @@ class BelongsToMany extends Association {
}
// Create the related model instance
return association.target.create(values, options).then(newAssociatedObject =>
sourceInstance[association.accessors.add](newAssociatedObject, _.omit(options, ['fields'])).then(() => newAssociatedObject)
);
const newAssociatedObject = await association.target.create(values, options);
await sourceInstance[association.accessors.add](newAssociatedObject, _.omit(options, ['fields']));
return newAssociatedObject;
}
verifyAssociationAlias(alias) {
......
......@@ -25,7 +25,7 @@ class ConnectionManager extends AbstractConnectionManager {
parserStore.clear();
}
connect(config) {
async connect(config) {
const connectionConfig = {
server: config.host,
authentication: {
......@@ -58,57 +58,59 @@ class ConnectionManager extends AbstractConnectionManager {
Object.assign(connectionConfig.options, config.dialectOptions.options);
}
return new Promise((resolve, reject) => {
const connection = new this.lib.Connection(connectionConfig);
connection.queue = new AsyncQueue();
connection.lib = this.lib;
const connectHandler = error => {
connection.removeListener('end', endHandler);
connection.removeListener('error', errorHandler);
if (error) return reject(error);
debug('connection acquired');
resolve(connection);
};
const endHandler = () => {
connection.removeListener('connect', connectHandler);
connection.removeListener('error', errorHandler);
reject(new Error('Connection was closed by remote server'));
};
const errorHandler = error => {
connection.removeListener('connect', connectHandler);
connection.removeListener('end', endHandler);
reject(error);
};
connection.once('error', errorHandler);
connection.once('end', endHandler);
connection.once('connect', connectHandler);
/*
* Permanently attach this event before connection is even acquired
* tedious sometime emits error even after connect(with error).
*
* If we dont attach this even that unexpected error event will crash node process
*
* E.g. connectTimeout is set higher than requestTimeout
*/
connection.on('error', error => {
switch (error.code) {
case 'ESOCKET':
case 'ECONNRESET':
this.pool.destroy(connection);
try {
return await new Promise((resolve, reject) => {
const connection = new this.lib.Connection(connectionConfig);
connection.queue = new AsyncQueue();
connection.lib = this.lib;
const connectHandler = error => {
connection.removeListener('end', endHandler);
connection.removeListener('error', errorHandler);
if (error) return reject(error);
debug('connection acquired');
resolve(connection);
};
const endHandler = () => {
connection.removeListener('connect', connectHandler);
connection.removeListener('error', errorHandler);
reject(new Error('Connection was closed by remote server'));
};
const errorHandler = error => {
connection.removeListener('connect', connectHandler);
connection.removeListener('end', endHandler);
reject(error);
};
connection.once('error', errorHandler);
connection.once('end', endHandler);
connection.once('connect', connectHandler);
/*
* Permanently attach this event before connection is even acquired
* tedious sometime emits error even after connect(with error).
*
* If we dont attach this even that unexpected error event will crash node process
*
* E.g. connectTimeout is set higher than requestTimeout
*/
connection.on('error', error => {
switch (error.code) {
case 'ESOCKET':
case 'ECONNRESET':
this.pool.destroy(connection);
}
});
if (config.dialectOptions && config.dialectOptions.debug) {
connection.on('debug', debugTedious.log.bind(debugTedious));
}
});
if (config.dialectOptions && config.dialectOptions.debug) {
connection.on('debug', debugTedious.log.bind(debugTedious));
}
}).catch(error => {
} catch (error) {
if (!error.code) {
throw new sequelizeErrors.ConnectionError(error);
}
......@@ -139,13 +141,13 @@ class ConnectionManager extends AbstractConnectionManager {
default:
throw new sequelizeErrors.ConnectionError(error);
}
});
}
}
disconnect(connection) {
async disconnect(connection) {
// Don't disconnect a connection that is already disconnected
if (connection.closed) {
return Promise.resolve();
return;
}
connection.queue.close();
......
......@@ -20,47 +20,32 @@
@private
*/
const removeColumn = function(qi, tableName, attributeName, options) {
const removeColumn = async function(qi, tableName, attributeName, options) {
options = Object.assign({ raw: true }, options || {});
const findConstraintSql = qi.QueryGenerator.getDefaultConstraintQuery(tableName, attributeName);
return qi.sequelize.query(findConstraintSql, options)
.then(([results]) => {
if (!results.length) {
// No default constraint found -- we can cleanly remove the column
return;
}
const dropConstraintSql = qi.QueryGenerator.dropConstraintQuery(tableName, results[0].name);
return qi.sequelize.query(dropConstraintSql, options);
})
.then(() => {
const findForeignKeySql = qi.QueryGenerator.getForeignKeyQuery(tableName, attributeName);
return qi.sequelize.query(findForeignKeySql, options);
})
.then(([results]) => {
if (!results.length) {
// No foreign key constraints found, so we can remove the column
return;
}
const dropForeignKeySql = qi.QueryGenerator.dropForeignKeyQuery(tableName, results[0].constraint_name);
return qi.sequelize.query(dropForeignKeySql, options);
})
.then(() => {
//Check if the current column is a primaryKey
const primaryKeyConstraintSql = qi.QueryGenerator.getPrimaryKeyConstraintQuery(tableName, attributeName);
return qi.sequelize.query(primaryKeyConstraintSql, options);
})
.then(([result]) => {
if (!result.length) {
return;
}
const dropConstraintSql = qi.QueryGenerator.dropConstraintQuery(tableName, result[0].constraintName);
return qi.sequelize.query(dropConstraintSql, options);
})
.then(() => {
const removeSql = qi.QueryGenerator.removeColumnQuery(tableName, attributeName);
return qi.sequelize.query(removeSql, options);
});
const [results0] = await qi.sequelize.query(findConstraintSql, options);
if (results0.length) {
// No default constraint found -- we can cleanly remove the column
const dropConstraintSql = qi.QueryGenerator.dropConstraintQuery(tableName, results0[0].name);
await qi.sequelize.query(dropConstraintSql, options);
}
const findForeignKeySql = qi.QueryGenerator.getForeignKeyQuery(tableName, attributeName);
const [results] = await qi.sequelize.query(findForeignKeySql, options);
if (results.length) {
// No foreign key constraints found, so we can remove the column
const dropForeignKeySql = qi.QueryGenerator.dropForeignKeyQuery(tableName, results[0].constraint_name);
await qi.sequelize.query(dropForeignKeySql, options);
}
//Check if the current column is a primaryKey
const primaryKeyConstraintSql = qi.QueryGenerator.getPrimaryKeyConstraintQuery(tableName, attributeName);
const [result] = await qi.sequelize.query(primaryKeyConstraintSql, options);
if (result.length) {
const dropConstraintSql = qi.QueryGenerator.dropConstraintQuery(tableName, result[0].constraintName);
await qi.sequelize.query(dropConstraintSql, options);
}
const removeSql = qi.QueryGenerator.removeColumnQuery(tableName, attributeName);
return qi.sequelize.query(removeSql, options);
};
module.exports = {
......
......@@ -25,13 +25,12 @@ class ConnectionManager extends AbstractConnectionManager {
this.refreshTypeParser(dataTypes);
}
_onProcessExit() {
const promises = Object.getOwnPropertyNames(this.connections)
.map(connection => promisify(callback => this.connections[connection].close(callback))());
return Promise
.all(promises)
.then(() => super._onProcessExit.call(this));
async _onProcessExit() {
await Promise.all(
Object.getOwnPropertyNames(this.connections)
.map(connection => promisify(callback => this.connections[connection].close(callback))())
);
return super._onProcessExit.call(this);
}
// Expose this as a method so that the parsing may be updated when the user has added additional, custom types
......@@ -43,7 +42,7 @@ class ConnectionManager extends AbstractConnectionManager {
parserStore.clear();
}
getConnection(options) {
async getConnection(options) {
options = options || {};
options.uuid = options.uuid || 'default';
options.storage = this.sequelize.options.storage || this.sequelize.options.host || ':memory:';
......@@ -55,7 +54,7 @@ class ConnectionManager extends AbstractConnectionManager {
options.readWriteMode = dialectOptions && dialectOptions.mode || defaultReadWriteMode;
if (this.connections[options.inMemory || options.uuid]) {
return Promise.resolve(this.connections[options.inMemory || options.uuid]);
return this.connections[options.inMemory || options.uuid];
}
if (!options.inMemory && (options.readWriteMode & this.lib.OPEN_CREATE) !== 0) {
......@@ -63,7 +62,7 @@ class ConnectionManager extends AbstractConnectionManager {
fs.mkdirSync(path.dirname(options.storage), { recursive: true });
}
return new Promise((resolve, reject) => {
const connection = await new Promise((resolve, reject) => {
this.connections[options.inMemory || options.uuid] = new this.lib.Database(
options.storage,
options.readWriteMode,
......@@ -73,18 +72,19 @@ class ConnectionManager extends AbstractConnectionManager {
resolve(this.connections[options.inMemory || options.uuid]);
}
);
}).then(connection => {
if (this.sequelize.config.password) {
// Make it possible to define and use password for sqlite encryption plugin like sqlcipher
connection.run(`PRAGMA KEY=${this.sequelize.escape(this.sequelize.config.password)}`);
}
if (this.sequelize.options.foreignKeys !== false) {
// Make it possible to define and use foreign key constraints unless
// explicitly disallowed. It's still opt-in per relation
connection.run('PRAGMA FOREIGN_KEYS=ON');
}
return connection;
});
if (this.sequelize.config.password) {
// Make it possible to define and use password for sqlite encryption plugin like sqlcipher
connection.run(`PRAGMA KEY=${this.sequelize.escape(this.sequelize.config.password)}`);
}
if (this.sequelize.options.foreignKeys !== false) {
// Make it possible to define and use foreign key constraints unless
// explicitly disallowed. It's still opt-in per relation
connection.run('PRAGMA FOREIGN_KEYS=ON');
}
return connection;
}
releaseConnection(connection, force) {
......
......@@ -177,20 +177,18 @@ exports.addConstraint = addConstraint;
* @private
* @returns {Promise}
*/
function getForeignKeyReferencesForTable(qi, tableName, options) {
async function getForeignKeyReferencesForTable(qi, tableName, options) {
const database = qi.sequelize.config.database;
const query = qi.QueryGenerator.getForeignKeysQuery(tableName, database);
return qi.sequelize.query(query, options)
.then(result => {
return result.map(row => ({
tableName,
columnName: row.from,
referencedTableName: row.table,
referencedColumnName: row.to,
tableCatalog: database,
referencedTableCatalog: database
}));
});
const result = await qi.sequelize.query(query, options);
return result.map(row => ({
tableName,
columnName: row.from,
referencedTableName: row.table,
referencedColumnName: row.to,
tableCatalog: database,
referencedTableCatalog: database
}));
}
exports.getForeignKeyReferencesForTable = getForeignKeyReferencesForTable;
......@@ -216,7 +216,7 @@ class Query extends AbstractQuery {
return result;
}
run(sql, parameters) {
async run(sql, parameters) {
const conn = this.connection;
this.sql = sql;
const method = this.getDatabaseMethod();
......@@ -231,69 +231,67 @@ class Query extends AbstractQuery {
}
return new Promise((resolve, reject) => {
return new Promise((resolve, reject) => conn.serialize(async () => {
const columnTypes = {};
conn.serialize(() => {
const executeSql = () => {
if (sql.startsWith('-- ')) {
return resolve();
}
const query = this;
// cannot use arrow function here because the function is bound to the statement
function afterExecute(executionError, results) {
try {
complete();
// `this` is passed from sqlite, we have no control over this.
// eslint-disable-next-line no-invalid-this
resolve(query._handleQueryResponse(this, columnTypes, executionError, results));
return;
} catch (error) {
reject(error);
}
const executeSql = () => {
if (sql.startsWith('-- ')) {
return resolve();
}
const query = this;
// cannot use arrow function here because the function is bound to the statement
function afterExecute(executionError, results) {
try {
complete();
// `this` is passed from sqlite, we have no control over this.
// eslint-disable-next-line no-invalid-this
resolve(query._handleQueryResponse(this, columnTypes, executionError, results));
return;
} catch (error) {
reject(error);
}
}
if (method === 'exec') {
// exec does not support bind parameter
conn[method](sql, afterExecute);
} else {
if (!parameters) parameters = [];
conn[method](sql, parameters, afterExecute);
}
return null;
};
if (method === 'exec') {
// exec does not support bind parameter
conn[method](sql, afterExecute);
} else {
if (!parameters) parameters = [];
conn[method](sql, parameters, afterExecute);
}
return null;
};
if (this.getDatabaseMethod() === 'all') {
let tableNames = [];
if (this.options && this.options.tableNames) {
tableNames = this.options.tableNames;
} else if (/FROM `(.*?)`/i.exec(this.sql)) {
tableNames.push(/FROM `(.*?)`/i.exec(this.sql)[1]);
}
if (this.getDatabaseMethod() === 'all') {
let tableNames = [];
if (this.options && this.options.tableNames) {
tableNames = this.options.tableNames;
} else if (/FROM `(.*?)`/i.exec(this.sql)) {
tableNames.push(/FROM `(.*?)`/i.exec(this.sql)[1]);
}
// If we already have the metadata for the table, there's no need to ask for it again
tableNames = tableNames.filter(tableName => !(tableName in columnTypes) && tableName !== 'sqlite_master');
// If we already have the metadata for the table, there's no need to ask for it again
tableNames = tableNames.filter(tableName => !(tableName in columnTypes) && tableName !== 'sqlite_master');
if (!tableNames.length) {
return executeSql();
}
return Promise.all(tableNames.map(tableName =>
new Promise(resolve => {
tableName = tableName.replace(/`/g, '');
columnTypes[tableName] = {};
conn.all(`PRAGMA table_info(\`${tableName}\`)`, (err, results) => {
if (!err) {
for (const result of results) {
columnTypes[tableName][result.name] = result.type;
}
}
resolve();
});
}))).then(executeSql);
if (!tableNames.length) {
return executeSql();
}
return executeSql();
});
});
await Promise.all(tableNames.map(tableName =>
new Promise(resolve => {
tableName = tableName.replace(/`/g, '');
columnTypes[tableName] = {};
conn.all(`PRAGMA table_info(\`${tableName}\`)`, (err, results) => {
if (!err) {
for (const result of results) {
columnTypes[tableName][result.name] = result.type;
}
}
resolve();
});
})));
}
return executeSql();
}));
}
parseConstraintsFromSql(sql) {
......@@ -419,24 +417,23 @@ class Query extends AbstractQuery {
}
}
handleShowIndexesQuery(data) {
async handleShowIndexesQuery(data) {
// Sqlite returns indexes so the one that was defined last is returned first. Lets reverse that!
return Promise.all(data.reverse().map(item => {
return Promise.all(data.reverse().map(async item => {
item.fields = [];
item.primary = false;
item.unique = !!item.unique;
item.constraintName = item.name;
return this.run(`PRAGMA INDEX_INFO(\`${item.name}\`)`).then(columns => {
for (const column of columns) {
item.fields[column.seqno] = {
attribute: column.name,
length: undefined,
order: undefined
};
}
const columns = await this.run(`PRAGMA INDEX_INFO(\`${item.name}\`)`);
for (const column of columns) {
item.fields[column.seqno] = {
attribute: column.name,
length: undefined,
order: undefined
};
}
return item;
});
return item;
}));
}
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!