Skip to main content

Node.js Streams: Solving OOM Crashes with Proper Backpressure Handling

 The error is all too familiar. Your ETL script runs perfectly on your local machine with a 50MB sample file. You deploy it to production to process a 10GB CSV dump, and 30 seconds later, the process dies:

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

This is not a memory leak in the traditional sense. It is a flow control failure. Specifically, your application is reading data from the source faster than it can write it to the destination.

The Root Cause: Unbounded Buffering

In Node.js, streams are designed to transport data. However, data takes up memory while it is being transported.

When you read from a fast source (like fs.createReadStream reading from an SSD) and write to a slow destination (like a database insert over the network or fs.createWriteStream to a slow disk), a bottleneck forms.

Internally, Writable streams have a highWaterMark property (defaulting to 16KB). This is the threshold where the stream signals it is "full." When you call stream.write(chunk), the method returns a boolean:

  • true: The internal buffer is below the limit. Keep sending.
  • false: The buffer is full. Stop sending until the buffer drains.

The Crash Mechanism: If your code ignores this false return value—common in naive event-listener patterns (data.on('data', chunk => target.write(chunk)))—Node.js will dutifully keep buffering incoming chunks in RAM. The read stream pushes data onto the heap faster than the garbage collector can free the processed chunks. Eventually, the V8 heap limit is breached, and the process crashes.

This mechanism of signaling the source to pause is called Backpressure.

The Fix: Modern Async Iterators and Pipelines

The historical solution was to listen for the drain event manually. This resulted in complex, brittle code.

The modern, production-grade solution utilizes stream.pipeline combined with Async Generators. This approach abstracts the backpressure management entirely, allowing Node.js to automatically pause the reading stream when the writing stream is saturated.

Below is a complete implementation of a robust CSV processor that operates with constant memory usage, regardless of input size.

The Implementation

We will create a script that reads a large dataset, parses it, transforms it (anonymizing email addresses), and writes it to a file. We simulate a "slow" consumer to demonstrate backpressure stability.

Prerequisites: Node.js v18+. Filename: etl-pipeline.mjs

import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { createInterface } from 'node:readline';

// 1. CONSTANTS
const INPUT_FILE = './large-dataset.csv'; // Assume this is 10GB
const OUTPUT_FILE = './clean-dataset.jsonl';
const SIMULATED_DB_DELAY_MS = 5; // Simulate slow I/O

/**
 * 2. THE TRANSFORMER
 * An Async Generator acts as the processing logic.
 * It yields data only when the consumer is ready.
 */
async function* processLine(sourceStream) {
  // readline creates an async iterator over line breaks
  const lines = createInterface({
    input: sourceStream,
    crlfDelay: Infinity,
  });

  let isHeader = true;
  let headers = [];

  for await (const line of lines) {
    // Skip empty lines
    if (!line.trim()) continue;

    const columns = line.split(',');

    // Handle CSV Header
    if (isHeader) {
      headers = columns.map(h => h.trim());
      isHeader = false;
      continue;
    }

    // Map columns to object
    const row = {};
    headers.forEach((header, index) => {
      row[header] = columns[index]?.trim();
    });

    // 3. BUSINESS LOGIC (Transformation)
    // Example: Anonymize email if present
    if (row.email) {
      const [user, domain] = row.email.split('@');
      row.email = `${user[0]}***@${domain}`;
    }

    // Convert to JSONL string
    const outputChunk = JSON.stringify(row) + '\n';
    
    yield outputChunk;
  }
}

/**
 * 4. THE SLOW CONSUMER (Simulation)
 * A custom Write stream that artificially slows down writing
 * to force backpressure to propagate up the chain.
 */
const createSlowWriteStream = (filepath) => {
  const fileStream = createWriteStream(filepath);
  
  return new Transform({
    async transform(chunk, encoding, callback) {
      // Simulate network/DB latency
      await new Promise(resolve => setTimeout(resolve, SIMULATED_DB_DELAY_MS));
      
      // Push data to the actual file
      const canPush = fileStream.write(chunk);
      
      if (!canPush) {
        // If file buffer is full, wait for drain
        fileStream.once('drain', callback);
      } else {
        callback();
      }
    },
    flush(callback) {
      fileStream.end(callback);
    }
  });
};

// 5. THE PIPELINE EXECUTION
async function runETL() {
  console.time('ETL Duration');
  console.log(`Starting ETL process. PID: ${process.pid}`);
  
  try {
    // Check initial memory usage
    const initialMem = process.memoryUsage().heapUsed / 1024 / 1024;
    console.log(`Initial Heap: ${initialMem.toFixed(2)} MB`);

    const source = createReadStream(INPUT_FILE, { encoding: 'utf8' });
    const destination = createSlowWriteStream(OUTPUT_FILE);

    // pipeline handles error propagation and stream cleanup automatically.
    // Ideally, pass the generator function execution here.
    await pipeline(
      // Source is passed implicitly if we structured differently, 
      // but here we pass the source into the generator manually
      // so the pipeline sees the generator as the "Read" stream.
      processLine(source),
      destination
    );

    console.log('Pipeline succeeded.');
  } catch (err) {
    console.error('Pipeline failed:', err);
    process.exit(1);
  } finally {
    const finalMem = process.memoryUsage().heapUsed / 1024 / 1024;
    console.log(`Final Heap: ${finalMem.toFixed(2)} MB`);
    console.timeEnd('ETL Duration');
  }
}

runETL();

Why This Works

The magic lies in the interaction between stream.pipelinefor await, and yield.

  1. Iterators Pull, They Don't Push: Unlike the traditional EventEmitter model where the read stream emits data as fast as possible ("Push"), Async Iterators function on a "Pull" model. The loop for await (const line of lines) only requests the next chunk of data when the previous iteration completes.

  2. Pipeline Management: The pipeline utility manages the flow. When the destination stream (our simulated slow writer) has its internal buffer filled, it signals the pipeline to stop pulling data from the processLine iterator.

  3. Propagation: Because processLine stops execution at the yield statement, it stops iterating over lines. Consequently, the readline interface stops reading from the createReadStream. The OS buffer for the file fills up, and the operating system pauses the disk read operation.

The result is a system where the speed is dictated entirely by the slowest link in the chain (the writer), not the fastest.

Conclusion

Memory crashes in Node.js data processing are rarely due to the sheer volume of data, but rather the rate at which that data is loaded into memory.

By abandoning legacy data event listeners and adopting node:stream/promises with Async Generators, you opt-in to automatic backpressure management. This ensures your application remains stable with 10MB or 10TB of data, maintaining a flat memory profile throughout the process.