Skip to main content

Idempotency Patterns in Microservices: Solving Duplicate Event Processing in Kafka

 In distributed systems, the guarantee of "exactly-once" processing is a myth when side effects are involved. While Kafka Streams offers exactly-once semantics for internal state, this guarantee evaporates the moment your service needs to make an external HTTP call (e.g., charging a credit card via Stripe) or write to a database that isn't part of the transaction log.

The default delivery semantic for Kafka is At-Least-Once. This means your system is architecturally destined to process duplicate messages. Without an idempotency strategy, you risk data corruption and financial loss.

The Root Cause: Why Duplicates Occur

To solve duplicate processing, we must understand where the duplicates originate. They primarily stem from two failures in the commitment protocol:

  1. Producer Retries (Network Jitter): A producer sends a message to the broker. The broker writes it successfully, but the network acknowledgement (ACK) fails to reach the producer. The producer, assuming failure, retries. Result: Two identical messages in the log.
  2. Consumer Offset Commit Failure: Your microservice processes a message and executes a database write. Immediately after, the service crashes or the network partitions before it can send the offset commit back to Kafka. When the service restarts (or a rebalance triggers), it reads the same offset again. Result: The logic executes twice.

The Solution: Ideally Idempotent Consumers

We cannot prevent duplicate delivery. We must prevent duplicate execution. This requires implementing an Idempotency Key pattern using an atomic distributed store (Redis).

The strategy relies on three distinct states for a message:

  1. New: We have never seen this key.
  2. Processing: We are currently working on this key (a lock).
  3. Completed: We have finished work for this key (a tombstone).

The Architecture

We will use Redis because it supports atomic operations (SET NX) necessary to handle race conditions that occur during consumer rebalancing.

  1. Extraction: The producer generates a unique idempotencyKey (UUID) and injects it into the Kafka Record Headers.
  2. Acquisition: The consumer attempts to atomically set this key in Redis with a status of PROCESSING.
  3. Execution: If acquired, execute business logic.
  4. Finalization: Update the key in Redis to COMPLETED.

Implementation

Below is a robust implementation using Node.js, TypeScript, kafkajs, and ioredis.

1. The Idempotency Guard

This class manages the atomic locking mechanism. We use a short TTL (Time To Live) for the lock to prevent deadlocks if a consumer crashes mid-process, and a long TTL for the completion tombstone.

import Redis from 'ioredis';

// Configuration
const REDIS_CONNECTION_STRING = process.env.REDIS_URL || 'redis://localhost:6379';
const LOCK_TTL_SECONDS = 30; // Time to process message before lock expires
const RETENTION_TTL_SECONDS = 86400; // 24 hours retention for completed keys

export class IdempotencyGuard {
  private redis: Redis;

  constructor() {
    this.redis = new Redis(REDIS_CONNECTION_STRING);
  }

  /**
   * Attempts to acquire a lock for processing.
   * Returns true if processing should proceed.
   * Throws if the event was already fully processed.
   */
  async shouldProcess(key: string): Promise<boolean> {
    const lockKey = `idemp:${key}`;

    // 1. Try to acquire the lock atomically.
    // SET key value NX (only if not exists) EX (expire time)
    const result = await this.redis.set(lockKey, 'PROCESSING', 'NX', 'EX', LOCK_TTL_SECONDS);

    // If 'OK', we own the lock. Proceed.
    if (result === 'OK') {
      return true;
    }

    // 2. If we couldn't set it, check the current state.
    const currentValue = await this.redis.get(lockKey);

    if (currentValue === 'COMPLETED') {
      console.log(`[Idempotency] Skipped ${key}: Already completed.`);
      return false; 
    }

    // 3. If state is 'PROCESSING', it means another consumer instance (or thread)
    // is currently working on it, or a previous attempt crashed.
    // In a high-throughput system, we back off and let the next retry handle it
    // if the lock expires.
    console.log(`[Idempotency] Skipped ${key}: Currently processing (race condition or rebalance).`);
    return false; 
  }

  /**
   * Marks the key as successfully completed.
   * Overwrites the lock with a long-lived tombstone.
   */
  async markComplete(key: string): Promise<void> {
    const lockKey = `idemp:${key}`;
    await this.redis.set(lockKey, 'COMPLETED', 'EX', RETENTION_TTL_SECONDS);
  }

  /**
   * Releases the lock in case of application error, allowing a retry.
   */
  async releaseLock(key: string): Promise<void> {
    const lockKey = `idemp:${key}`;
    // Only delete if it is still in PROCESSING state (Lua script for atomicity is safer here, 
    // but strict deletion is acceptable for this pattern).
    await this.redis.del(lockKey);
  }
}

2. The Kafka Consumer Service

Here is how we integrate the guard into the consumer loop. Note the specific handling of finally blocks to ensure locks are managed correctly during exceptions.

import { Kafka, EachMessagePayload } from 'kafkajs';
import { IdempotencyGuard } from './IdempotencyGuard';

const kafka = new Kafka({
  clientId: 'payment-service',
  brokers: ['pkc-1234.us-east-1.aws.confluent.cloud:9092'],
});

const consumer = kafka.consumer({ groupId: 'payment-processors-group' });
const idempotency = new IdempotencyGuard();

// Mock side-effect function
async function chargeCreditCard(userId: string, amount: number) {
  // Simulate API call
  return new Promise(resolve => setTimeout(resolve, 200));
}

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'payment-events', fromBeginning: false });

  await consumer.run({
    // Disable auto-commit to ensure we only commit after successful processing
    autoCommit: false, 
    eachMessage: async ({ topic, partition, message, heartbeat }: EachMessagePayload) => {
      
      // 1. Extract Idempotency Key (Assume Producer sends it in headers)
      const idempotencyKey = message.headers?.['idempotency-key']?.toString();
      
      if (!idempotencyKey) {
        // DLQ strategy: Events without keys are dangerous. 
        // Log error and commit to skip, or send to Dead Letter Queue.
        console.error('Missing idempotency key. Skipping.');
        await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
        return;
      }

      try {
        // 2. Check Idempotency
        const canProcess = await idempotency.shouldProcess(idempotencyKey);

        if (!canProcess) {
          // If we can't process (completed or locked), we assume success to Kafka 
          // so it moves the offset forward.
          await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
          return;
        }

        // 3. Decode Payload
        const payload = JSON.parse(message.value?.toString() || '{}');
        console.log(`Processing payment for key: ${idempotencyKey}`);

        // 4. Execute Side Effects
        await chargeCreditCard(payload.userId, payload.amount);

        // 5. Mark Idempotent Record as Complete
        await idempotency.markComplete(idempotencyKey);

        // 6. Commit Offset to Kafka
        await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
        
        // Heartbeat to keep session alive during heavy processing
        await heartbeat();

      } catch (error) {
        console.error(`Error processing message ${idempotencyKey}:`, error);
        
        // 7. On Error: Release Lock
        // This allows the message to be picked up again by a retry or rebalance immediately
        await idempotency.releaseLock(idempotencyKey);
        
        // Do NOT commit offset. Kafka will redeliver this message.
        throw error; 
      }
    },
  });
};

run().catch(console.error);

How It Works

This implementation addresses the root causes of duplication:

  1. Atomic SET NX: This is the most critical line: redis.set(key, 'PROCESSING', 'NX', ...);. If two consumers (due to a rebalance race condition) try to process the same message simultaneously, Redis guarantees only one receives "OK". The other receives null and skips execution.
  2. Transient Lock vs. Permanent Tombstone:
    • Lock (30s): If the consumer crashes during chargeCreditCard, the lock expires in 30 seconds. Kafka will eventually redeliver the uncommitted message. The retry will find the lock gone and proceed.
    • Tombstone (24h): Once marked COMPLETED, if the Producer resent the message (Reason #1 in Root Cause), we find the tombstone and return immediately without charging the card.
  3. Manual Offset Management: By turning off autoCommit, we ensure that we never tell Kafka "we're done" until Redis confirms the tombstone is written.

Conclusion

Idempotency is not a feature you add later; it is an architectural requirement for Event-Driven Systems. Relying on Kafka's internal delivery guarantees is insufficient when external side effects are involved. By utilizing a shared atomic store like Redis with specific locking patterns, you ensure that "At-Least-Once" delivery results in "Exactly-Once" execution.