In distributed systems, silent data loss is often more dangerous than a loud crash. A common architectural pattern involves using Redis Streams with Consumer Groups to distribute workload across microservices.
However, a specific failure mode exists that often goes undetected until reconciliation reports show missing data: The Stuck Pending Entry.
If a consumer pulls a message using XREADGROUP but crashes (OOM, network partition, unhandled exception) before executing XACK, that message remains in the Pending Entries List (PEL). It is not lost, but it is effectively frozen. Other consumers will not process it because Redis knows it was delivered to the now-dead consumer, and the stream cursor has moved on.
This post details how to implement a rigorous recovery mechanism using the modern XAUTOCLAIM command (available in Redis 6.2+) to reclaim and process these stalled messages.
The Root Cause: The PEL Lifecycle
To fix this, one must understand how Redis tracks message ownership.
- Consumption: When a service calls
XREADGROUP GROUP mygroup CONSUMER worker-1 STREAMS mystream >, Redis delivers new messages and immediately adds them to the Pending Entries List (PEL). - Ownership: The message inside the PEL is assigned to
worker-1. It has alast-delivered-timeand adelivery-count. - Acknowledgment: Ideally,
worker-1processes the job and callsXACK. Redis removes the ID from the PEL. - The Failure: If
worker-1crashes before step 3, the message sits in the PEL forever. SubsequentXREADGROUPcalls with the special>ID only ask for new messages (never delivered to anyone). The stalled message is skipped by every other healthy consumer.
The Solution: Automated Claiming with XAUTOCLAIM
Historically, recovering these messages required a complex two-step dance: XPENDING to find idle messages, followed by XCLAIM to take ownership.
Redis 6.2 introduced XAUTOCLAIM, which atomically finds messages that have been idle longer than a specific duration, claims them for the calling consumer, and returns them for immediate processing.
Below is a robust implementation in TypeScript using ioredis. This implementation represents a "Recovery Worker" that runs alongside your standard consumers or as a scheduled sidecar process.
implementation.ts
import Redis from 'ioredis';
// Configuration constants
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
const STREAM_KEY = 'orders:events';
const CONSUMER_GROUP = 'order-processing-group';
const RECOVERY_CONSUMER_NAME = 'recovery-worker-01';
// Minimum time (ms) a message must remain unacknowledged before we claim it.
// Set this higher than your longest expected processing time.
const MIN_IDLE_TIME_MS = 60000; // 60 seconds
const BATCH_SIZE = 10;
const redis = new Redis(REDIS_URL);
interface StreamMessage {
id: string;
data: Record<string, string>;
}
/**
* Parses the raw array response from ioredis into a usable object.
*/
function parseMessage(rawMessage: any[]): StreamMessage {
const [id, fields] = rawMessage;
const data: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
return { id, data };
}
/**
* Processes a claimed message.
* In a real scenario, this would trigger your idempotent business logic.
*/
async function processMessage(message: StreamMessage): Promise<void> {
console.log(`[Recovery] Processing stalled message ${message.id}`, message.data);
// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 50));
// Critical: Acknowledge the message so it leaves the PEL
await redis.xack(STREAM_KEY, CONSUMER_GROUP, message.id);
console.log(`[Recovery] Acknowledged ${message.id}`);
}
/**
* The core recovery loop.
* Iterates through the PEL looking for messages stuck longer than MIN_IDLE_TIME_MS.
*/
async function runRecoveryCycle() {
let cursor = '0-0'; // Start at the beginning of the PEL
let cycleMessagesProcessed = 0;
try {
// Ensure group exists before attempting recovery (optional safety check)
try {
await redis.xgroup('CREATE', STREAM_KEY, CONSUMER_GROUP, '0', 'MKSTREAM');
} catch (err: any) {
if (!err.message.includes('BUSYGROUP')) throw err;
}
console.log('Starting recovery cycle...');
do {
// XAUTOCLAIM signature: key, group, consumer, min-idle-time, start-id, count
// Returns: [next-cursor, [messages]]
const result = await redis.xautoclaim(
STREAM_KEY,
CONSUMER_GROUP,
RECOVERY_CONSUMER_NAME,
MIN_IDLE_TIME_MS,
cursor,
'COUNT',
BATCH_SIZE
);
// result[0] is the new cursor (stream ID) for the next iteration
// result[1] is an array of messages successfully claimed
const [nextCursor, rawMessages] = result;
// Update cursor for next loop iteration
cursor = nextCursor as string;
const messages = (rawMessages as any[]).map(parseMessage);
if (messages.length > 0) {
console.log(`Claimed ${messages.length} messages.`);
// Process claimed messages in parallel or sequence
await Promise.allSettled(messages.map(processMessage));
cycleMessagesProcessed += messages.length;
}
} while (cursor !== '0-0'); // 0-0 indicates the end of the PEL scan
console.log(`Recovery cycle complete. Recovered: ${cycleMessagesProcessed}`);
} catch (error) {
console.error('Fatal error in recovery cycle:', error);
} finally {
// In a cron-job scenario, you might disconnect here.
// In a long-running worker, you would set a timeout for the next cycle.
await redis.quit();
}
}
// Execute
runRecoveryCycle();
Why This Works
1. Atomic Ownership Transfer
When XAUTOCLAIM runs, it scans the PEL. If it finds a message where idle_time > MIN_IDLE_TIME_MS, it performs three actions atomically:
- Resets the idle timer.
- Increments the delivery count.
- Changes the owner of the message to
RECOVERY_CONSUMER_NAME.
This prevents a "thundering herd" scenario where multiple recovery workers try to process the same stuck message simultaneously. Only the worker that successfully claimed it receives the message payload.
2. Cursor-Based Iteration
The PEL can be large. Just like scanning keys, we cannot block the Redis thread by requesting the entire PEL. XAUTOCLAIM returns a cursor (a message ID). We feed this cursor back into the next call until Redis returns 0-0, ensuring we scan the entire list of pending messages efficiently.
3. Handling Poison Pills
While the code above solves the "stuck" message problem, it introduces a new risk: The Poison Pill. If a message causes the consumer to crash every time it is processed, the recovery worker will claim it, crash, restart, claim it again, and crash again.
To mitigate this, you must inspect the delivery counter. In ioredis, checking the delivery count usually requires a separate XPENDING call or using XINFO.
Here is the logic you should inject into processMessage:
// Pseudocode extension for Poison Pill handling
const [pendingInfo] = await redis.xpending(STREAM_KEY, CONSUMER_GROUP, message.id, message.id, 1);
// If this message has been delivered more than 5 times, it's toxic.
if (pendingInfo && pendingInfo.deliveryCount > 5) {
console.error(`Message ${message.id} is a poison pill. Moving to DLQ.`);
await moveToDeadLetterQueue(message);
await redis.xack(STREAM_KEY, CONSUMER_GROUP, message.id); // Remove from primary stream
return;
}
Conclusion
Redis Streams are reliable, but consumer applications are not. Without an active strategy to monitor the Pending Entries List, you are strictly relying on the stability of your consumers for data integrity.
Implementing an XAUTOCLAIM loop is the standard pattern for ensuring at-least-once delivery guarantees in the face of random consumer failures. Treat the PEL as a database of "work in progress" that requires regular garbage collection.