Skip to main content

Node.js Streams: Handling Backpressure to Prevent OOM Crashes in Large ETL Jobs

 You have built an ETL pipeline. It reads a 5GB CSV file, transforms the rows, and inserts them into a database or writes to a new format. In your local development environment with a sample dataset, it runs perfectly. You deploy to production, feed it the real dataset, and 45 seconds later, the process dies.

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

This is the classic "Fast Producer, Slow Consumer" problem. In Node.js, if you read data faster than you can write it, the excess data has to go somewhere. Without flow control, that "somewhere" is your RAM.

Here is the root cause of the crash and the exact pattern to manage backpressure manually using modern Node.js APIs.

The Root Cause: The Internal Buffer and highWaterMark

Node.js streams are not just event emitters; they are buffer managers.

Every Writable stream has an internal buffer (writableBuffer). The size limit of this buffer is determined by the highWaterMark (default is 16KB for standard streams, 64KB for file streams).

When you call stream.write(chunk), Node appends that chunk to the internal buffer.

  1. The Check: Node checks if the total buffer size exceeds highWaterMark.
  2. The Signal: stream.write() returns a boolean.
    • true: The buffer is below the limit. You can keep pushing.
    • false: The buffer is full. Stop writing.

The OOM Crash Mechanism: Most developers ignore the return value of write(). They use a .on('data') handler or a simple loop to push data as fast as the file system reads it. Since disk reads (SSD) are vastly faster than network writes (Database/API), the writableBuffer grows infinitely.

Eventually, the buffer consumes the entire V8 heap (default ~512MB - 2GB depending on version/flags), and the Garbage Collector gives up.

The Fix: Respecting the Boolean and drain

To prevent OOM, we must respect the return value of write(). If it returns false, we must pause the producer and wait for the drain event. The drain event fires when the internal buffer has emptied effectively, signaling it is safe to resume writing.

Below is a complete, modern implementation using ES ModulesAsync Iterators, and Node.js 20+ patterns. This example simulates a fast file reader and a slow writable stream to demonstrate the backpressure mechanics explicitly.

The Solution

import { createReadStream } from 'node:fs';
import { Writable } from 'node:stream';
import { once } from 'node:events';
import { stat } from 'node:fs/promises';
import { resolve } from 'node:path';

// 1. Simulate a Slow Writable (e.g., a Database Insert Stream)
class SlowDatabaseStream extends Writable {
  constructor(options) {
    // Set a explicit highWaterMark to demonstrate backpressure triggering
    // In real DBs, this might be higher, but the logic remains.
    super({ ...options, highWaterMark: 16384 }); // 16KB
  }

  _write(chunk, encoding, callback) {
    // Simulate latency (e.g., DB network round-trip)
    // 10ms delay per chunk is huge for a computer, creating instant backpressure
    setTimeout(() => {
      // "Save" the data (no-op here)
      callback();
    }, 10);
  }
}

/**
 * 2. The Robust ETL Processor
 * Uses Async Iterators to read + Manual Flow Control for writing
 */
async function runETL(sourcePath) {
  console.time('ETL Job');
  
  // Validate source
  const stats = await stat(sourcePath);
  console.log(`Processing file size: ${(stats.size / 1024 / 1024).toFixed(2)} MB`);

  const source = createReadStream(sourcePath, { 
    encoding: 'utf8',
    highWaterMark: 64 * 1024 // Read 64KB chunks
  });

  const dest = new SlowDatabaseStream();
  
  let totalBytesProcessed = 0;
  let drainCount = 0;

  try {
    // 3. Modern Iteration: for await...of
    // This allows us to pause the reading loop naturally using await
    for await (const chunk of source) {
      
      // A. Attempt to write
      const canWrite = dest.write(chunk);

      // Update metrics
      totalBytesProcessed += chunk.length;

      // B. BACKPRESSURE CHECK
      // If write returns false, the internal buffer is full.
      if (!canWrite) {
        drainCount++;
        // C. Pause execution here until 'drain' is emitted.
        // This stops the loop, which effectively stops reading from the source.
        await once(dest, 'drain');
      }
    }

    // D. End the stream gracefully
    dest.end();
    
    // E. Wait for the final finish event
    await once(dest, 'finish');

    console.log('\n--- ETL Complete ---');
    console.log(`Total Bytes: ${totalBytesProcessed}`);
    console.log(`Backpressure Pauses (Drains): ${drainCount}`);
    console.timeEnd('ETL Job');

  } catch (err) {
    console.error('Pipeline failed:', err);
    process.exit(1);
  }
}

// Execution
// Ensure you have a large file named 'large_dataset.csv' in the directory
// You can generate one via: `mkfile -n 500m large_dataset.csv` (MacOS/Linux)
const FILE_PATH = resolve(process.cwd(), 'large_dataset.csv');

// Check if file exists to avoid confusion in the example
stat(FILE_PATH)
  .then(() => runETL(FILE_PATH))
  .catch(() => {
    console.error("Error: 'large_dataset.csv' not found. Please create a dummy large file to test.");
    console.log("Run: dd if=/dev/zero of=large_dataset.csv bs=1024 count=100000"); // 100MB approx
  });

The Explanation

Why does this prevent the crash?

  1. Coupling Read and Write Speeds: By using for await...of on the readable stream, we convert the stream into an async iterable. The reading process (the loop) only advances when the loop body completes.
  2. The once Promise: When dest.write(chunk) returns false, we encounter the line await once(dest, 'drain'). This creates a Promise that only resolves when the SlowDatabaseStream emits the 'drain' event.
  3. Halting the Loop: While we await that Promise, the for loop is suspended. Consequently, we stop asking the source stream for more data.
  4. Propagating the Pause: Because we stop consuming the readable stream, the readable stream's internal buffer eventually fills up. When that happens, Node stops reading from the file system descriptor. The backpressure propagates all the way from the slow database, through your code, back to the disk.

This keeps memory usage constant (roughly source.highWaterMark + dest.highWaterMark), regardless of whether the file is 500MB or 500GB.

Conclusion

While stream.pipeline handles backpressure automatically and is recommended for simple pass-throughs, understanding the write() -> false -> drain cycle is mandatory for custom ETL logic.

If you are manipulating data inside a loop or aggregating chunks before writing, you cannot rely on auto-piping. You must manually verify the buffer state. Failing to handle drain is the number one cause of instability in Node.js data processing services. Respect the boolean, and your application will remain stable under any load.