70 lines
2.3 KiB
TypeScript
70 lines
2.3 KiB
TypeScript
// This module contains async synchronization primitives
|
|
|
|
export class MutexAcquireFailure extends Error {}
|
|
|
|
/**
|
|
* Basic synchronization primitive that only allows a single coroutine to access a resource
|
|
* at a time.
|
|
*/
|
|
export class Mutex {
|
|
private locking = Promise.resolve();
|
|
private isLocked = false;
|
|
|
|
/**
|
|
* Get the current state of the mutex.
|
|
* Unlike multithreaded synchronization primitives, this can be queried safely without having to acquire
|
|
* the mutex, as asyncio multitasking is cooperative, meaning you don't need to worry about another task
|
|
* interrupting your function unless your function returns to the event loop via an 'await' statement.
|
|
*/
|
|
get locked() {
|
|
return this.isLocked;
|
|
}
|
|
|
|
/**
|
|
* Acquire the mutex. This returns a function to release the mutex, which must be called
|
|
* or the application will deadlock.
|
|
*
|
|
* It is preferred to use the with() method instead of calling acquire() directly, which provides
|
|
* safeguards to ensure that the lock will always be released, however some logic and code flows
|
|
* will require the flexibility that calling acquire() directly gives.
|
|
*/
|
|
async acquire() {
|
|
this.isLocked = true;
|
|
let unlockNext: () => void;
|
|
const willLock = new Promise<void>((resolve) => (unlockNext = resolve));
|
|
willLock.then(() => (this.isLocked = false));
|
|
const willUnlock = this.locking.then(() => unlockNext);
|
|
this.locking = this.locking.then(() => willLock);
|
|
return willUnlock;
|
|
}
|
|
|
|
/**
|
|
* Executes the given function with the mutex acquired, and releases the mutex when the function exits.
|
|
*
|
|
* This is the preferred way to utilize a Mutex, as it ensures that it will not be left in a locked state.
|
|
*/
|
|
async with<T>(func: () => T): Promise<T> {
|
|
const release = await this.acquire();
|
|
try {
|
|
let result = func();
|
|
while (result instanceof Promise) {
|
|
result = await result;
|
|
}
|
|
return result;
|
|
} finally {
|
|
release();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Executes the given function with the mutex acquired, throwing a MutexAcquireFailure exception if the
|
|
* mutex was already acquired.
|
|
*/
|
|
async immediate<T>(func: () => T): Promise<T> {
|
|
if (this.locked) {
|
|
throw new MutexAcquireFailure("Mutex was already locked");
|
|
}
|
|
return this.with(func);
|
|
}
|
|
}
|