Queue
The queue system handles asynchronous processing of webhooks and background jobs. When an event triggers a webhook, the event is enqueued as a job and processed asynchronously by worker tasks.
How It Works
- An event occurs (channel occupied, member joined, client event, etc.).
- The webhook integration creates a job with the event payload.
- The job is pushed to the configured queue backend.
- Worker tasks pull jobs from the queue and dispatch webhook HTTP requests.
This decouples event handling from webhook delivery, preventing slow or failing webhook endpoints from blocking the WebSocket server.
Drivers
| Driver | Best For | Persistence | External Dependency |
|---|---|---|---|
memory | Development, testing | None (lost on restart) | None |
redis | Production single Redis | Yes | Redis 6+ |
redis-cluster | Production with Redis Cluster | Yes | Redis Cluster 6+ |
sqs | AWS-native, serverless | Yes | AWS SQS |
none | Disable webhooks entirely | N/A | None |
Set the driver:
QUEUE_DRIVER=redis
{ "queue": { "driver": "redis" } }
Memory Queue
In-memory queue with no external dependencies. Jobs are lost on restart.
{ "queue": { "driver": "memory" } }
- Capacity: 100,000 jobs max (FIFO eviction when full).
- Processing: Background task polls every 500ms, spawns async tasks per job.
- Use case: Development and testing only.
Redis Queue
Uses Redis lists with BLPOP for efficient blocking queue consumption.
{
"queue": {
"driver": "redis",
"redis": {
"concurrency": 5,
"prefix": "sockudo_queue:"
}
}
}
| Setting | Env Var | Default | Description |
|---|---|---|---|
concurrency | QUEUE_REDIS_CONCURRENCY | 5 | Number of concurrent worker tasks |
prefix | QUEUE_REDIS_PREFIX | sockudo_queue: | Redis key prefix |
url_override | — | null | Override Redis URL (otherwise uses database.redis) |
cluster_mode | — | false | Use cluster-aware connections |
How Workers Process Jobs
Each worker runs an infinite loop:
BLPOPon the queue key with a short timeout.- Deserialize the job payload (JSON).
- Dispatch the webhook HTTP request.
- Move to the next job.
Workers run concurrently - with concurrency: 5, up to 5 jobs are processed simultaneously.
Redis Key Format
{prefix}queue:{queue_name}
For example: sockudo_queue:queue:webhooks
Redis Cluster Queue
Same processing model as the Redis queue, but uses Redis Cluster connections.
{
"queue": {
"driver": "redis-cluster",
"redis_cluster": {
"concurrency": 5,
"prefix": "sockudo_queue:",
"nodes": ["redis://10.0.0.1:7000", "redis://10.0.0.2:7001"],
"request_timeout_ms": 5000
}
}
}
| Setting | Env Var | Default | Description |
|---|---|---|---|
concurrency | REDIS_CLUSTER_QUEUE_CONCURRENCY | 5 | Concurrent workers |
prefix | REDIS_CLUSTER_QUEUE_PREFIX | sockudo_queue: | Key prefix |
nodes | REDIS_CLUSTER_NODES | ["redis://127.0.0.1:6379"] | Cluster seed nodes |
request_timeout_ms | — | 5000 | Request timeout in ms |
SQS Queue
AWS SQS-based queue with long polling. Supports both standard and FIFO queues.
{
"queue": {
"driver": "sqs",
"sqs": {
"region": "us-east-1",
"concurrency": 5,
"fifo": false
}
}
}
Configuration
| Setting | Env Var | Default | Description |
|---|---|---|---|
region | QUEUE_SQS_REGION | us-east-1 | AWS region |
queue_url_prefix | — | null | Custom queue URL prefix |
visibility_timeout | QUEUE_SQS_VISIBILITY_TIMEOUT | 30 | Message visibility timeout (seconds) |
endpoint_url | QUEUE_SQS_ENDPOINT_URL | null | Custom endpoint (for LocalStack) |
max_messages | QUEUE_SQS_MAX_MESSAGES | 10 | Messages per poll |
wait_time_seconds | QUEUE_SQS_WAIT_TIME_SECONDS | 5 | Long polling wait time (seconds) |
concurrency | QUEUE_SQS_CONCURRENCY | 5 | Concurrent workers |
fifo | QUEUE_SQS_FIFO | false | Use FIFO queue |
message_group_id | — | default | Message group ID for FIFO queues |
FIFO Queues
When fifo: true, Sockudo appends .fifo to the queue name and uses message_group_id for ordering. FIFO queues guarantee exactly-once delivery and strict ordering within a message group.
Local Development
Use LocalStack for local SQS testing:
QUEUE_SQS_ENDPOINT_URL=http://localhost:4566
QUEUE_SQS_REGION=us-east-1
Auto-Creation
Sockudo automatically creates SQS queues if they don't exist. Malformed messages are deleted automatically to prevent poison pill scenarios.
Job Structure
Each queued job contains:
| Field | Description |
|---|---|
app_key | The app key for webhook signing |
app_id | The app ID |
app_secret | The app secret for HMAC signature |
payload.time_ms | Event timestamp in milliseconds |
payload.events | Array of Pusher-compatible event objects |
original_signature | Deduplication key |
Choosing a Driver
| Scenario | Recommended Driver |
|---|---|
| Development / testing | memory |
| Single-node production | redis |
| Multi-node production | redis or redis-cluster |
| AWS-native deployment | sqs |
| Webhooks not needed | none |
redis for the queue too - no additional infrastructure needed.