// This module contains async synchronization primitives export class MutexAcquireFailure extends Error {} /** * Basic synchronization primitive that only allows a single coroutine to access a resource * at a time. */ export class Mutex { private locking = Promise.resolve(); private isLocked = false; /** * Get the current state of the mutex. * Unlike multithreaded synchronization primitives, this can be queried safely without having to acquire * the mutex, as asyncio multitasking is cooperative, meaning you don't need to worry about another task * interrupting your function unless your function returns to the event loop via an 'await' statement. */ get locked() { return this.isLocked; } /** * Acquire the mutex. This returns a function to release the mutex, which must be called * or the application will deadlock. * * It is preferred to use the with() method instead of calling acquire() directly, which provides * safeguards to ensure that the lock will always be released, however some logic and code flows * will require the flexibility that calling acquire() directly gives. */ async acquire() { this.isLocked = true; let unlockNext: () => void; const willLock = new Promise((resolve) => (unlockNext = resolve)); willLock.then(() => (this.isLocked = false)); const willUnlock = this.locking.then(() => unlockNext); this.locking = this.locking.then(() => willLock); return willUnlock; } /** * Executes the given function with the mutex acquired, and releases the mutex when the function exits. * * This is the preferred way to utilize a Mutex, as it ensures that it will not be left in a locked state. */ async with(func: () => T): Promise { const release = await this.acquire(); try { let result = func(); while (result instanceof Promise) { result = await result; } return result; } finally { release(); } } /** * Executes the given function with the mutex acquired, throwing a MutexAcquireFailure exception if the * mutex was already acquired. */ async immediate(func: () => T): Promise { if (this.locked) { throw new MutexAcquireFailure("Mutex was already locked"); } return this.with(func); } }