In distributed payment and inventory systems, exact-once processing is a myth. We operate in a world of at-least-once delivery. If your Kafka broker doesn't receive an acknowledgment (ACK) due to a network partition or a consumer crash, it will redeliver that message.
If your consumer logic isn't idempotent, you just charged a customer twice or decremented inventory for an item that doesn't exist. Worse, if messages arrive out of order (e.g., "Order Cancelled" arrives before "Order Created" due to partition rebalancing), your system state becomes corrupt.
This post details a rigorous implementation of the Idempotency Key Pattern using Redis (for atomic locking) and PostgreSQL (for consistent state), ensuring data integrity regardless of duplicate deliveries or race conditions.
The Root Cause: Why "At-Least-Once" Breaks Data
The core issue lies in the gap between Side Effect Execution and Broker Acknowledgment.
Consider this standard consumer flow:
- Read message from RabbitMQ/Kafka.
- Update Database (Side Effect).
- Send ACK to Broker.
If the consumer crashes or the network hangs exactly between Step 2 and Step 3, the database transaction commits, but the broker assumes failure. The broker redelivers the message to a new consumer instance. The new consumer executes Step 2 again.
Furthermore, relying solely on database constraints (like UNIQUE keys) often fails in high-throughput systems because:
- Race Conditions: Two consumers picking up duplicates simultaneously can attempt the check-then-act flow at the same nanosecond.
- Business Logic Complexity: Not all duplicates result in primary key violations (e.g.,
UPDATE accounts SET balance = balance - 100). - Out-of-Order: Processing version 2 before version 1 requires Optimistic Concurrency Control (OCC), not just uniqueness checks.
The Fix: Atomic Idempotency Barriers
We solve this with a multi-stage barrier:
- Atomic Lock (Redis): Prevent concurrent processing of the same message ID.
- Optimistic Locking (PostgreSQL): Prevent overwriting newer state with older data.
- Completion Marker: Persist the result to handle retries instantly.
Tech Stack
- Runtime: Node.js / TypeScript (ES2024)
- State Store: PostgreSQL
- Lock Manager: Redis (utilizing Lua scripts for atomicity)
1. The Redis Lock (Lua Script)
We cannot use standard GET then SET logic in Redis; it creates a race condition. We must use a Lua script to ensure the check-and-lock operation is atomic.
This script handles three states:
ACQUIRED: The lock is yours. Process the event.LOCKED: Another instance is currently processing this event.COMPLETED: The event was already successfully processed.
import Redis from "ioredis";
const redis = new Redis(process.env.REDIS_URL);
// LUA Script for atomicity
// ARGV[1] = expiration in seconds (TTL)
// ARGV[2] = value to set (e.g., "PROCESSING")
const ACQUIRE_LOCK_SCRIPT = `
local status = redis.call("GET", KEYS[1])
if status == "COMPLETED" then
return "ALREADY_PROCESSED"
elseif status == "PROCESSING" then
return "LOCKED"
else
-- Lock doesn't exist, acquire it
redis.call("SET", KEYS[1], ARGV[2], "EX", ARGV[1])
return "ACQUIRED"
end
`;
export type LockStatus = "ACQUIRED" | "LOCKED" | "ALREADY_PROCESSED";
export async function acquireIdempotencyLock(
key: string,
ttlSeconds: number = 60
): Promise<LockStatus> {
const result = await redis.eval(
ACQUIRE_LOCK_SCRIPT,
1,
`idempotency:${key}`,
ttlSeconds,
"PROCESSING"
);
return result as LockStatus;
}
export async function markEventCompleted(key: string, ttlSeconds: number = 86400) {
// Overwrite "PROCESSING" with "COMPLETED" and extend TTL
await redis.set(`idempotency:${key}`, "COMPLETED", "EX", ttlSeconds);
}
2. The Robust Consumer Implementation
This consumer handles duplicates, concurrent race conditions, and out-of-order packets.
import { Pool } from "pg";
import { acquireIdempotencyLock, markEventCompleted } from "./redisLock";
const db = new Pool({ connectionString: process.env.DATABASE_URL });
interface PaymentEvent {
eventId: string; // Unique UUID for the message (Idempotency Key)
accountId: string;
amount: number;
sequenceId: number; // Monotonic counter or Timestamp for ordering
type: "DEBIT" | "CREDIT";
}
/**
* Processes a payment event ensuring exactly-once semantics via idempotency.
*/
export async function processPaymentEvent(event: PaymentEvent): Promise<void> {
const { eventId, accountId, amount, sequenceId } = event;
// 1. ATOMIC BARRIER: Check Redis
const lockStatus = await acquireIdempotencyLock(eventId);
if (lockStatus === "ALREADY_PROCESSED") {
console.info(`[${eventId}] Event already processed. Skipping.`);
return; // Safe to ACK
}
if (lockStatus === "LOCKED") {
// Another consumer is processing this RIGHT NOW.
// We throw an error to NACK the message so it's requeued.
// It will be processed later or skipped if the other consumer finishes.
throw new Error(`[${eventId}] Concurrent processing detected. Requeueing.`);
}
// 2. DATABASE TRANSACTION (The Side Effect)
const client = await db.connect();
try {
await client.query("BEGIN");
// 3. OPTIMISTIC LOCKING: Handle Out-of-Order Arrival
// We only update if the current account version is strictly lower than the event sequence.
// This prevents an old message (seq 10) overwriting a newer state (seq 11).
const updateQuery = `
UPDATE accounts
SET
balance = balance + $1,
last_sequence_id = $2
WHERE id = $3
AND last_sequence_id < $2
RETURNING id;
`;
// Adjust logic based on DEBIT/CREDIT
const signedAmount = event.type === "DEBIT" ? -amount : amount;
const res = await client.query(updateQuery, [
signedAmount,
sequenceId,
accountId
]);
if (res.rowCount === 0) {
// 4. HANDLE STALE EVENTS
// If rowCount is 0, either account doesn't exist OR sequence is old.
// We check existence to differentiate.
const check = await client.query("SELECT last_sequence_id FROM accounts WHERE id = $1", [accountId]);
if (check.rows.length && check.rows[0].last_sequence_id >= sequenceId) {
console.warn(`[${eventId}] Stale event (seq ${sequenceId}). Current DB is ${check.rows[0].last_sequence_id}. Ignoring.`);
// We consider this "processed" because it's obsolete.
} else {
throw new Error(`Account ${accountId} not found.`);
}
}
await client.query("COMMIT");
// 5. FINALIZE IDEMPOTENCY
await markEventCompleted(eventId);
console.info(`[${eventId}] Successfully processed.`);
} catch (error) {
await client.query("ROLLBACK");
// If the DB transaction failed, we let the lock expire (TTL).
// The message will be redelivered and retried.
console.error(`[${eventId}] Transaction failed:`, error);
throw error; // Triggers NACK/Retry
} finally {
client.release();
}
}
The Explanation: Why This Works
1. Handling the "Double Delivery" Race Condition
Without the Redis SETNX (via Lua), if two consumers receive the same message simultaneously, both might query the database, see the event hasn't been applied, and apply the transaction. The Lua script guarantees that only one connection receives "ACQUIRED". The other receives "LOCKED" and yields, preventing the DB race condition entirely.
2. Handling the "Consumer Crash" (Zombie Lock)
If the consumer acquires the lock but crashes before calling markEventCompleted, the key remains in Redis with value "PROCESSING". Because we set a TTL (e.g., 60 seconds) on the lock acquisition:
- The broker detects the crash (connection loss) and redelivers the message.
- The Redis key expires automatically after 60s.
- The new consumer acquires a fresh lock and retries the transaction.
3. Handling Out-of-Order Packets
In distributed systems, Packet B (Sequence 100) might arrive before Packet A (Sequence 99). If we simply applied changes blindly, Packet B sets the balance to $100. Then Packet A arrives and overwrites it to $50 (based on older data). The SQL clause AND last_sequence_id < $2 implements Optimistic Concurrency Control. It ensures that we never apply an update that regresses the state of the entity, effectively making late-arriving messages idempotent (no-ops).
Conclusion
Idempotency is not a "nice-to-have" feature in distributed systems; it is an architectural requirement. You cannot rely on the network to deliver messages exactly once, and you cannot rely on packet ordering across different partitions.
By combining an atomic distributed lock (Redis Lua) with database-level optimistic locking (SQL), you create a system that is resilient to duplicates, retries, and concurrency, guaranteeing data integrity in even the most chaotic network conditions.