Skip to main content

Node.js Streams: Solving Heap Out of Memory with `stream.pipeline`

 You have deployed a standard ETL job or a file upload service. It works flawlessly with 100MB files on your local machine. But in production, under load, or when processing a 5GB CSV, the pod crashes. You check the logs and see the dreaded V8 signature:

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

The immediate knee-jerk reaction is to increase max-old-space-size. However, if your logic relies on source.pipe(dest), throwing RAM at the problem is a band-aid, not a fix. The issue isn't just the file size; it is likely how your stream pipeline handles error propagation and lifecycle management when things go wrong.

The Root Cause: Why .pipe() Leaks Memory

The classic readable.pipe(writable) method manages backpressure—it pauses the readable stream when the writable stream’s high-water mark is reached. However, .pipe() has a fatal flaw regarding error propagation and stream destruction.

If the writable stream emits an error (e.g., a database connection timeout or a disk write error), the readable stream is not automatically destroyed. It remains open, and often, it continues to buffer data internally, waiting for the destination to drain.

Because the destination has errored, it never emits the 'drain' event. The readable stream keeps filling its internal buffer until the V8 heap is exhausted. Furthermore, standard .pipe() chains require verbose boilerplate to handle errors on every single stream in the chain to prevent file descriptor leaks.

// The dangerous pattern
source
  .pipe(transform) // If this errors...
  .pipe(destination); // ...source might stay open and leak memory.

In the example above, if destination fails, source is unaware. It acts as a zombie process holding onto memory references that the Garbage Collector cannot touch.

The Solution: stream.pipeline

Node.js introduced stream.pipeline (and its promise-based sibling in node:stream/promises) to solve this specific architectural flaw.

pipeline pipes streams together but adds a critical layer of management: it attaches error handlers to every stream in the chain. If any stream in the pipeline fails, pipeline automatically destroys all other streams in the chain, releasing file handles and memory immediately.

Implementation

Below is a robust, production-ready implementation of a streaming ETL process using node:stream/promises and async generators. This script reads a large Gzipped CSV, parses it, sanitizes data, and writes it to a new location without leaking memory, even if the disk runs out of space midway.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGunzip } from 'node:zlib';
import { Transform } from 'node:stream';
import { open } from 'node:fs/promises';

/**
 * Simulates a complex transformation or validation step.
 * Using an async generator allows for easy backpressure handling
 * and integration with modern stream pipelines.
 */
async function* csvLineTransformer(sourceStream) {
  let buffer = '';
  
  for await (const chunk of sourceStream) {
    buffer += chunk;
    const lines = buffer.split('\n');
    // Keep the last partial line in the buffer
    buffer = lines.pop();

    for (const line of lines) {
      if (!line.trim()) continue;
      
      // Simulate heavy processing logic
      const columns = line.split(',');
      const processed = {
        id: columns[0],
        value: Number(columns[1]) * 100,
        timestamp: new Date().toISOString()
      };

      yield JSON.stringify(processed) + '\n';
    }
  }
  
  // Process remaining buffer
  if (buffer.trim()) {
    yield JSON.stringify({ raw: buffer, note: 'partial_end' }) + '\n';
  }
}

async function runETLJob(inputFile, outputFile) {
  console.time('ETL Job');
  
  // 1. Create Streams
  // explicit highWaterMark controls buffer size (default is 64kb)
  const source = createReadStream(inputFile, { highWaterMark: 64 * 1024 });
  const gunzip = createGunzip();
  const destination = createWriteStream(outputFile);

  // 2. AbortController for external cancellation (e.g., HTTP request timeout)
  const ac = new AbortController();

  try {
    console.log(`Starting processing of ${inputFile}...`);

    // 3. The Pipeline
    // This replaces source.pipe(gunzip).pipe(transform).pipe(dest)
    await pipeline(
      source,
      gunzip,
      csvLineTransformer, // Async generators are valid stream sources in pipeline
      destination,
      { signal: ac.signal }
    );

    console.log('Pipeline succeeded.');
  } catch (err) {
    // 4. Error Handling
    // If ANY stream fails, we land here, and ALL streams are already destroyed.
    console.error('Pipeline failed:', err.message);
    
    // Cleanup partial output to prevent corrupt data downstream
    try {
      const fileHandle = await open(outputFile);
      await fileHandle.close(); // Ensure handle is free before unlink
      // In a real app, use fs.unlink(outputFile) here
      console.log('Cleaned up partial output file.');
    } catch (cleanupErr) {
      // Ignore cleanup errors (file might not exist)
    }
    
    process.exit(1);
  } finally {
    console.timeEnd('ETL Job');
  }
}

// Execution
// Ensure you have a 'large_data.csv.gz' to test, or the script will catch the ENOENT.
runETLJob('./large_data.csv.gz', './processed_output.jsonl');

Why This Fixes The OOM Error

1. Unified Lifecycle Management

In the code above, the pipeline function acts as a supervisor.

  • Scenario: The disk fills up while writing to destination.
  • Result: destination emits an EACCES or ENOSPC error.
  • Pipeline Action: pipeline catches this, immediately calls .destroy() on sourcegunzip, and the generator.
  • Outcome: The source stops reading from the file system. The internal buffers in gunzip are discarded. The memory is freed for Garbage Collection immediately.

2. Async Iterator Support

Notice we passed csvLineTransformer directly into pipeline. In modern Node.js, pipeline supports Async Iterators/Generators natively. This eliminates the need to import Transform from stream and manually implement _transform and _flush methods, which are notorious sources of implementation errors (like forgetting to call the callback).

3. Backpressure is Implicit

The for await (const chunk of sourceStream) loop in the generator respects backpressure naturally. If the generator yields data faster than destination can write, the await on the yield (implicitly handled by the pipeline execution engine) pauses the execution of the loop, effectively pausing the read stream upstream.

Conclusion

If you are writing Node.js services dealing with file uploads, CSV parsing, or image processing, standard .pipe() usage is technical debt. It is fragile and leaks resources upon partial failure.

Refactor your streams to use stream.pipeline (Callback API) or import { pipeline } from 'node:stream/promises'. It ensures that when one part of your system fails, the rest cleans up gracefully, keeping your heap usage deterministic and your servers alive.