不要怂,就是干,撸起袖子干!

Commit c77b1f3a by Harry Yu Committed by GitHub

fix(mysql, mariadb): release connection on deadlocks (#12841)

1 parent 83be13fa
...@@ -54,8 +54,25 @@ class Query extends AbstractQuery { ...@@ -54,8 +54,25 @@ class Query extends AbstractQuery {
await this.logWarnings(results); await this.logWarnings(results);
} }
} catch (err) { } catch (err) {
// MariaDB automatically rolls-back transactions in the event of a deadlock // MariaDB automatically rolls-back transactions in the event of a
// deadlock.
//
// Even though we shouldn't need to do this, we initiate a manual
// rollback. Without the rollback, the next transaction using the
// connection seems to retain properties of the previous transaction
// (e.g. isolation level) and not work as expected.
//
// For example (in our tests), a follow-up READ_COMMITTED transaction
// doesn't work as expected unless we explicitly rollback the
// transaction: it would fail to read a value inserted outside of that
// transaction.
if (options.transaction && err.errno === 1213) { if (options.transaction && err.errno === 1213) {
try {
await options.transaction.rollback();
} catch (err) {
// Ignore errors - since MariaDB automatically rolled back, we're
// not that worried about this redundant rollback failing.
}
options.transaction.finished = 'rollback'; options.transaction.finished = 'rollback';
} }
......
...@@ -1091,12 +1091,15 @@ class Sequelize { ...@@ -1091,12 +1091,15 @@ class Sequelize {
await transaction.commit(); await transaction.commit();
return await result; return await result;
} catch (err) { } catch (err) {
if (!transaction.finished) { try {
try { if (!transaction.finished) {
await transaction.rollback(); await transaction.rollback();
} catch (err0) { } else {
// ignore // release the connection, even if we don't need to rollback
await transaction.cleanup();
} }
} catch (err0) {
// ignore
} }
throw err; throw err;
} }
...@@ -1104,7 +1107,10 @@ class Sequelize { ...@@ -1104,7 +1107,10 @@ class Sequelize {
} }
/** /**
* Use CLS with Sequelize. * Use CLS (Continuation Local Storage) with Sequelize. With Continuation
* Local Storage, all queries within the transaction callback will
* automatically receive the transaction object.
*
* CLS namespace provided is stored as `Sequelize._cls` * CLS namespace provided is stored as `Sequelize._cls`
* *
* @param {object} ns CLS namespace * @param {object} ns CLS namespace
......
...@@ -56,15 +56,11 @@ class Transaction { ...@@ -56,15 +56,11 @@ class Transaction {
throw new Error(`Transaction cannot be committed because it has been finished with state: ${this.finished}`); throw new Error(`Transaction cannot be committed because it has been finished with state: ${this.finished}`);
} }
this._clearCls();
try { try {
return await this.sequelize.getQueryInterface().commitTransaction(this, this.options); return await this.sequelize.getQueryInterface().commitTransaction(this, this.options);
} finally { } finally {
this.finished = 'commit'; this.finished = 'commit';
if (!this.parent) { this.cleanup();
this.cleanup();
}
for (const hook of this._afterCommitHooks) { for (const hook of this._afterCommitHooks) {
await hook.apply(this, [this]); await hook.apply(this, [this]);
} }
...@@ -85,20 +81,23 @@ class Transaction { ...@@ -85,20 +81,23 @@ class Transaction {
throw new Error('Transaction cannot be rolled back because it never started'); throw new Error('Transaction cannot be rolled back because it never started');
} }
this._clearCls();
try { try {
return await this return await this
.sequelize .sequelize
.getQueryInterface() .getQueryInterface()
.rollbackTransaction(this, this.options); .rollbackTransaction(this, this.options);
} finally { } finally {
if (!this.parent) { this.cleanup();
this.cleanup();
}
} }
} }
/**
* Called to acquire a connection to use and set the correct options on the connection.
* We should ensure all of the environment that's set up is cleaned up in `cleanup()` below.
*
* @param {boolean} useCLS Defaults to true: Use CLS (Continuation Local Storage) with Sequelize. With CLS, all queries within the transaction callback will automatically receive the transaction object.
* @returns {Promise}
*/
async prepareEnvironment(useCLS) { async prepareEnvironment(useCLS) {
let connectionPromise; let connectionPromise;
...@@ -162,6 +161,11 @@ class Transaction { ...@@ -162,6 +161,11 @@ class Transaction {
} }
cleanup() { cleanup() {
// Don't release the connection if there's a parent transaction or
// if we've already cleaned up
if (this.parent || this.connection.uuid === undefined) return;
this._clearCls();
const res = this.sequelize.connectionManager.releaseConnection(this.connection); const res = this.sequelize.connectionManager.releaseConnection(this.connection);
this.connection.uuid = undefined; this.connection.uuid = undefined;
return res; return res;
......
...@@ -424,70 +424,105 @@ if (current.dialect.supports.transactions) { ...@@ -424,70 +424,105 @@ if (current.dialect.supports.transactions) {
if (dialect === 'mysql' || dialect === 'mariadb') { if (dialect === 'mysql' || dialect === 'mariadb') {
describe('deadlock handling', () => { describe('deadlock handling', () => {
it('should treat deadlocked transaction as rollback', async function() { // Create the `Task` table and ensure it's initialized with 2 rows
const Task = this.sequelize.define('task', { const getAndInitializeTaskModel = async sequelize => {
const Task = sequelize.define('task', {
id: { id: {
type: Sequelize.INTEGER, type: Sequelize.INTEGER,
primaryKey: true primaryKey: true
} }
}); });
// This gets called twice simultaneously, and we expect at least one of the calls to encounter a await sequelize.sync({ force: true });
// deadlock (which effectively rolls back the active transaction). await Task.create({ id: 0 });
// We only expect createTask() to insert rows if a transaction is active. If deadlocks are handled await Task.create({ id: 1 });
// properly, it should only execute a query if we're actually inside a real transaction. If it does return Task;
// execute a query, we expect the newly-created rows to be destroyed when we forcibly rollback by };
// throwing an error.
// tl;dr; This test is designed to ensure that this function never inserts and commits a new row. // Lock the row with id of `from`, and then try to update the row
const update = async (from, to) => this.sequelize.transaction(async transaction => { // with id of `to`
try { const update = async (sequelize, Task, from, to) => {
await sequelize
.transaction(async transaction => {
try { try {
await Task.findAll({ try {
where: { await Task.findAll({
id: { where: { id: { [Sequelize.Op.eq]: from } },
[Sequelize.Op.eq]: from lock: transaction.LOCK.UPDATE,
transaction
});
await delay(10);
await Task.update(
{ id: to },
{
where: { id: { [Sequelize.Op.ne]: to } },
lock: transaction.LOCK.UPDATE,
transaction
} }
}, );
lock: 'UPDATE', } catch (e) {
transaction console.log(e.message);
}); }
await delay(10); await Task.create({ id: 2 }, { transaction });
await Task.update({ id: to }, {
where: {
id: {
[Sequelize.Op.ne]: to
}
},
lock: transaction.LOCK.UPDATE,
transaction
});
} catch (e) { } catch (e) {
console.log(e.message); console.log(e.message);
} }
await Task.create({ id: 2 }, { transaction }); throw new Error('Rollback!');
} catch (e) { })
console.log(e.message); .catch(() => {});
} };
throw new Error('Rollback!');
}).catch(() => {});
await this.sequelize.sync({ force: true }); it('should treat deadlocked transaction as rollback', async function() {
await Task.create({ id: 0 }); const Task = await getAndInitializeTaskModel(this.sequelize);
await Task.create({ id: 1 });
await Promise.all([ // This gets called twice simultaneously, and we expect at least one of the calls to encounter a
update(1, 0), // deadlock (which effectively rolls back the active transaction).
update(0, 1) // We only expect createTask() to insert rows if a transaction is active. If deadlocks are handled
]); // properly, it should only execute a query if we're actually inside a real transaction. If it does
// execute a query, we expect the newly-created rows to be destroyed when we forcibly rollback by
// throwing an error.
// tl;dr; This test is designed to ensure that this function never inserts and commits a new row.
await Promise.all([update(this.sequelize, Task, 1, 0), update(this.sequelize, Task, 0, 1)]);
const count = await Task.count(); const count = await Task.count();
// If we were actually inside a transaction when we called `Task.create({ id: 2 })`, no new rows should be added. // If we were actually inside a transaction when we called `Task.create({ id: 2 })`, no new rows should be added.
expect(count).to.equal(2, 'transactions were fully rolled-back, and no new rows were added'); expect(count).to.equal(2, 'transactions were fully rolled-back, and no new rows were added');
}); });
it('should release the connection for a deadlocked transaction', async function() {
const Task = await getAndInitializeTaskModel(this.sequelize);
// 1 of 2 queries should deadlock and be rolled back by InnoDB
this.sinon.spy(this.sequelize.connectionManager, 'releaseConnection');
await Promise.all([update(this.sequelize, Task, 1, 0), update(this.sequelize, Task, 0, 1)]);
// Verify that both of the connections were released
expect(this.sequelize.connectionManager.releaseConnection.callCount).to.equal(2);
// Verify that a follow-up READ_COMMITTED works as expected.
// For unknown reasons, we need to explicitly rollback on MariaDB,
// even though the transaction should've automatically been rolled
// back.
// Otherwise, this READ_COMMITTED doesn't work as expected.
const User = this.sequelize.define('user', {
username: Support.Sequelize.STRING
});
await this.sequelize.sync({ force: true });
await this.sequelize.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED },
async transaction => {
const users0 = await User.findAll({ transaction });
expect(users0).to.have.lengthOf(0);
await User.create({ username: 'jan' }); // Create a User outside of the transaction
const users = await User.findAll({ transaction });
expect(users).to.have.lengthOf(1); // We SHOULD see the created user inside the transaction
}
);
});
}); });
} }
...@@ -576,13 +611,16 @@ if (current.dialect.supports.transactions) { ...@@ -576,13 +611,16 @@ if (current.dialect.supports.transactions) {
await expect( await expect(
this.sequelize.sync({ force: true }).then(() => { this.sequelize.sync({ force: true }).then(() => {
return this.sequelize.transaction({ isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED }, async transaction => { return this.sequelize.transaction(
const users0 = await User.findAll({ transaction }); { isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED },
await expect( users0 ).to.have.lengthOf(0); async transaction => {
await User.create({ username: 'jan' }); // Create a User outside of the transaction const users0 = await User.findAll({ transaction });
const users = await User.findAll({ transaction }); expect(users0).to.have.lengthOf(0);
return expect( users ).to.have.lengthOf(1); // We SHOULD see the created user inside the transaction await User.create({ username: 'jan' }); // Create a User outside of the transaction
}); const users = await User.findAll({ transaction });
expect(users).to.have.lengthOf(1); // We SHOULD see the created user inside the transaction
}
);
}) })
).to.eventually.be.fulfilled; ).to.eventually.be.fulfilled;
}); });
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!