asyncQueue
AsyncQueuer
ClassWhile the Queuer provides synchronous queueing with timing controls, the AsyncQueuer is designed specifically for handling concurrent asynchronous operations. It implements what is traditionally known as a "task pool" or "worker pool" pattern, allowing multiple operations to be processed simultaneously while maintaining control over concurrency and timing. The implementation is mostly copied from Swimmer, Tanner's original task pooling utility that has been serving the JavaScript community since 2017.
Async queueing extends the basic queueing concept by adding concurrent processing capabilities. Instead of processing one item at a time, an async queuer can process multiple items simultaneously while still maintaining order and control over the execution. This is particularly useful when dealing with I/O operations, network requests, or any tasks that spend most of their time waiting rather than consuming CPU.
Async Queueing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queueing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
Async Queueing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queueing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
Async queueing is particularly effective when you need to:
Common use cases include:
The AsyncQueuer is very versatile and can be used in many situations. Really, it's just not a good fit only when you don't plan to take advantage of all of its features. If you don't need all executions that are queued to go through, use [Throttling][../guides/throttling] instead. If you don't need concurrent processing, use [Queueing][../guides/queueing] instead.
TanStack Pacer provides async queueing through the simple asyncQueue function and the more powerful AsyncQueuer class.
The asyncQueue function provides a simple way to create an always-running async queue:
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue<string>({
concurrency: 2,
onUpdate: (queuer) => {
console.log('Active tasks:', queuer.getActiveItems().length)
}
})
// Add async tasks to be processed
processItems(async () => {
const result = await fetchData(1)
return result
})
processItems(async () => {
const result = await fetchData(2)
return result
})
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue<string>({
concurrency: 2,
onUpdate: (queuer) => {
console.log('Active tasks:', queuer.getActiveItems().length)
}
})
// Add async tasks to be processed
processItems(async () => {
const result = await fetchData(1)
return result
})
processItems(async () => {
const result = await fetchData(2)
return result
})
The usage of the asyncQueue function is a bit limited, as it is just a wrapper around the AsyncQueuer class that only exposes the addItem method. For more control over the queue, use the AsyncQueuer class directly.
The AsyncQueuer class provides complete control over async queue behavior:
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer<string>({
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true // Start processing immediately
})
// Add error and success handlers
queue.onError((error) => {
console.error('Task failed:', error)
})
queue.onSuccess((result) => {
console.log('Task completed:', result)
})
// Add async tasks
queue.addItem(async () => {
const result = await fetchData(1)
return result
})
queue.addItem(async () => {
const result = await fetchData(2)
return result
})
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer<string>({
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true // Start processing immediately
})
// Add error and success handlers
queue.onError((error) => {
console.error('Task failed:', error)
})
queue.onSuccess((result) => {
console.log('Task completed:', result)
})
// Add async tasks
queue.addItem(async () => {
const result = await fetchData(1)
return result
})
queue.addItem(async () => {
const result = await fetchData(2)
return result
})
The AsyncQueuer supports different queueing strategies to handle various processing requirements. Each strategy determines how tasks are added and processed from the queue.
FIFO queues process tasks in the exact order they were added, making them ideal for maintaining sequence:
const queue = new AsyncQueuer<string>({
addItemsTo: 'back', // default
getItemsFrom: 'front', // default
concurrency: 2
})
queue.addItem(async () => 'first') // [first]
queue.addItem(async () => 'second') // [first, second]
// Processes: first and second concurrently
const queue = new AsyncQueuer<string>({
addItemsTo: 'back', // default
getItemsFrom: 'front', // default
concurrency: 2
})
queue.addItem(async () => 'first') // [first]
queue.addItem(async () => 'second') // [first, second]
// Processes: first and second concurrently
LIFO stacks process the most recently added tasks first, useful for prioritizing newer tasks:
const stack = new AsyncQueuer<string>({
addItemsTo: 'back',
getItemsFrom: 'back', // Process newest items first
concurrency: 2
})
stack.addItem(async () => 'first') // [first]
stack.addItem(async () => 'second') // [first, second]
// Processes: second first, then first
const stack = new AsyncQueuer<string>({
addItemsTo: 'back',
getItemsFrom: 'back', // Process newest items first
concurrency: 2
})
stack.addItem(async () => 'first') // [first]
stack.addItem(async () => 'second') // [first, second]
// Processes: second first, then first
Priority queues process tasks based on their assigned priority values, ensuring important tasks are handled first. There are two ways to specify priorities:
const priorityQueue = new AsyncQueuer<string>({
concurrency: 2
})
// Create tasks with static priority values
const lowPriorityTask = Object.assign(
async () => 'low priority result',
{ priority: 1 }
)
const highPriorityTask = Object.assign(
async () => 'high priority result',
{ priority: 3 }
)
const mediumPriorityTask = Object.assign(
async () => 'medium priority result',
{ priority: 2 }
)
// Add tasks in any order - they'll be processed by priority (higher numbers first)
priorityQueue.addItem(lowPriorityTask)
priorityQueue.addItem(highPriorityTask)
priorityQueue.addItem(mediumPriorityTask)
// Processes: high and medium concurrently, then low
const priorityQueue = new AsyncQueuer<string>({
concurrency: 2
})
// Create tasks with static priority values
const lowPriorityTask = Object.assign(
async () => 'low priority result',
{ priority: 1 }
)
const highPriorityTask = Object.assign(
async () => 'high priority result',
{ priority: 3 }
)
const mediumPriorityTask = Object.assign(
async () => 'medium priority result',
{ priority: 2 }
)
// Add tasks in any order - they'll be processed by priority (higher numbers first)
priorityQueue.addItem(lowPriorityTask)
priorityQueue.addItem(highPriorityTask)
priorityQueue.addItem(mediumPriorityTask)
// Processes: high and medium concurrently, then low
const dynamicPriorityQueue = new AsyncQueuer<string>({
concurrency: 2,
getPriority: (task) => {
// Calculate priority based on task properties or other factors
// Higher numbers have priority
return calculateTaskPriority(task)
}
})
// Add tasks - priority will be calculated dynamically
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('low')
return result
})
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('high')
return result
})
const dynamicPriorityQueue = new AsyncQueuer<string>({
concurrency: 2,
getPriority: (task) => {
// Calculate priority based on task properties or other factors
// Higher numbers have priority
return calculateTaskPriority(task)
}
})
// Add tasks - priority will be calculated dynamically
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('low')
return result
})
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('high')
return result
})
Priority queues are essential when:
The AsyncQueuer provides comprehensive error handling capabilities to ensure robust task processing. You can handle errors at both the queue level and individual task level:
const queue = new AsyncQueuer<string>()
// Handle errors globally
const queue = new AsyncQueuer<string>({
onError: (error) => {
console.error('Task failed:', error)
},
onSuccess: (result) => {
console.log('Task succeeded:', result)
},
onSettled: (result) => {
if (result instanceof Error) {
console.log('Task failed:', result)
} else {
console.log('Task succeeded:', result)
}
}
})
// Handle errors per task
queue.addItem(async () => {
throw new Error('Task failed')
}).catch(error => {
console.error('Individual task error:', error)
})
const queue = new AsyncQueuer<string>()
// Handle errors globally
const queue = new AsyncQueuer<string>({
onError: (error) => {
console.error('Task failed:', error)
},
onSuccess: (result) => {
console.log('Task succeeded:', result)
},
onSettled: (result) => {
if (result instanceof Error) {
console.log('Task failed:', result)
} else {
console.log('Task succeeded:', result)
}
}
})
// Handle errors per task
queue.addItem(async () => {
throw new Error('Task failed')
}).catch(error => {
console.error('Individual task error:', error)
})
The AsyncQueuer provides several methods for monitoring and controlling queue state:
// Queue inspection
queue.peek() // View next item without removing it
queue.size() // Get current queue size
queue.isEmpty() // Check if queue is empty
queue.isFull() // Check if queue has reached maxSize
queue.getAllItems() // Get copy of all queued items
queue.getActiveItems() // Get currently processing items
queue.getPendingItems() // Get items waiting to be processed
// Queue manipulation
queue.clear() // Remove all items
queue.reset() // Reset to initial state
queue.getExecutionCount() // Get number of processed items
// Processing control
queue.start() // Begin processing items
queue.stop() // Pause processing
queue.isRunning() // Check if queue is processing
queue.isIdle() // Check if queue is empty and not processing
// Queue inspection
queue.peek() // View next item without removing it
queue.size() // Get current queue size
queue.isEmpty() // Check if queue is empty
queue.isFull() // Check if queue has reached maxSize
queue.getAllItems() // Get copy of all queued items
queue.getActiveItems() // Get currently processing items
queue.getPendingItems() // Get items waiting to be processed
// Queue manipulation
queue.clear() // Remove all items
queue.reset() // Reset to initial state
queue.getExecutionCount() // Get number of processed items
// Processing control
queue.start() // Begin processing items
queue.stop() // Pause processing
queue.isRunning() // Check if queue is processing
queue.isIdle() // Check if queue is empty and not processing
Each framework adapter builds convenient hooks and functions around the async queuer classes. Hooks like useAsyncQueuer or useAsyncQueuerState are small wrappers that can cut down on the boilerplate needed in your own code for some common use cases.
Your weekly dose of JavaScript news. Delivered every Monday to over 100,000 devs, for free.