You have deployed a standard ETL job or a file upload service. It works flawlessly with 100MB files on your local machine. But in production, under load, or when processing a 5GB CSV, the pod crashes. You check the logs and see the dreaded V8 signature:
FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
The immediate knee-jerk reaction is to increase max-old-space-size. However, if your logic relies on source.pipe(dest), throwing RAM at the problem is a band-aid, not a fix. The issue isn't just the file size; it is likely how your stream pipeline handles error propagation and lifecycle management when things go wrong.
The Root Cause: Why .pipe() Leaks Memory
The classic readable.pipe(writable) method manages backpressure—it pauses the readable stream when the writable stream’s high-water mark is reached. However, .pipe() has a fatal flaw regarding error propagation and stream destruction.
If the writable stream emits an error (e.g., a database connection timeout or a disk write error), the readable stream is not automatically destroyed. It remains open, and often, it continues to buffer data internally, waiting for the destination to drain.
Because the destination has errored, it never emits the 'drain' event. The readable stream keeps filling its internal buffer until the V8 heap is exhausted. Furthermore, standard .pipe() chains require verbose boilerplate to handle errors on every single stream in the chain to prevent file descriptor leaks.
// The dangerous pattern
source
.pipe(transform) // If this errors...
.pipe(destination); // ...source might stay open and leak memory.
In the example above, if destination fails, source is unaware. It acts as a zombie process holding onto memory references that the Garbage Collector cannot touch.
The Solution: stream.pipeline
Node.js introduced stream.pipeline (and its promise-based sibling in node:stream/promises) to solve this specific architectural flaw.
pipeline pipes streams together but adds a critical layer of management: it attaches error handlers to every stream in the chain. If any stream in the pipeline fails, pipeline automatically destroys all other streams in the chain, releasing file handles and memory immediately.
Implementation
Below is a robust, production-ready implementation of a streaming ETL process using node:stream/promises and async generators. This script reads a large Gzipped CSV, parses it, sanitizes data, and writes it to a new location without leaking memory, even if the disk runs out of space midway.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGunzip } from 'node:zlib';
import { Transform } from 'node:stream';
import { open } from 'node:fs/promises';
/**
* Simulates a complex transformation or validation step.
* Using an async generator allows for easy backpressure handling
* and integration with modern stream pipelines.
*/
async function* csvLineTransformer(sourceStream) {
let buffer = '';
for await (const chunk of sourceStream) {
buffer += chunk;
const lines = buffer.split('\n');
// Keep the last partial line in the buffer
buffer = lines.pop();
for (const line of lines) {
if (!line.trim()) continue;
// Simulate heavy processing logic
const columns = line.split(',');
const processed = {
id: columns[0],
value: Number(columns[1]) * 100,
timestamp: new Date().toISOString()
};
yield JSON.stringify(processed) + '\n';
}
}
// Process remaining buffer
if (buffer.trim()) {
yield JSON.stringify({ raw: buffer, note: 'partial_end' }) + '\n';
}
}
async function runETLJob(inputFile, outputFile) {
console.time('ETL Job');
// 1. Create Streams
// explicit highWaterMark controls buffer size (default is 64kb)
const source = createReadStream(inputFile, { highWaterMark: 64 * 1024 });
const gunzip = createGunzip();
const destination = createWriteStream(outputFile);
// 2. AbortController for external cancellation (e.g., HTTP request timeout)
const ac = new AbortController();
try {
console.log(`Starting processing of ${inputFile}...`);
// 3. The Pipeline
// This replaces source.pipe(gunzip).pipe(transform).pipe(dest)
await pipeline(
source,
gunzip,
csvLineTransformer, // Async generators are valid stream sources in pipeline
destination,
{ signal: ac.signal }
);
console.log('Pipeline succeeded.');
} catch (err) {
// 4. Error Handling
// If ANY stream fails, we land here, and ALL streams are already destroyed.
console.error('Pipeline failed:', err.message);
// Cleanup partial output to prevent corrupt data downstream
try {
const fileHandle = await open(outputFile);
await fileHandle.close(); // Ensure handle is free before unlink
// In a real app, use fs.unlink(outputFile) here
console.log('Cleaned up partial output file.');
} catch (cleanupErr) {
// Ignore cleanup errors (file might not exist)
}
process.exit(1);
} finally {
console.timeEnd('ETL Job');
}
}
// Execution
// Ensure you have a 'large_data.csv.gz' to test, or the script will catch the ENOENT.
runETLJob('./large_data.csv.gz', './processed_output.jsonl');
Why This Fixes The OOM Error
1. Unified Lifecycle Management
In the code above, the pipeline function acts as a supervisor.
- Scenario: The disk fills up while writing to
destination. - Result:
destinationemits anEACCESorENOSPCerror. - Pipeline Action:
pipelinecatches this, immediately calls.destroy()onsource,gunzip, and the generator. - Outcome: The
sourcestops reading from the file system. The internal buffers ingunzipare discarded. The memory is freed for Garbage Collection immediately.
2. Async Iterator Support
Notice we passed csvLineTransformer directly into pipeline. In modern Node.js, pipeline supports Async Iterators/Generators natively. This eliminates the need to import Transform from stream and manually implement _transform and _flush methods, which are notorious sources of implementation errors (like forgetting to call the callback).
3. Backpressure is Implicit
The for await (const chunk of sourceStream) loop in the generator respects backpressure naturally. If the generator yields data faster than destination can write, the await on the yield (implicitly handled by the pipeline execution engine) pauses the execution of the loop, effectively pausing the read stream upstream.
Conclusion
If you are writing Node.js services dealing with file uploads, CSV parsing, or image processing, standard .pipe() usage is technical debt. It is fragile and leaks resources upon partial failure.
Refactor your streams to use stream.pipeline (Callback API) or import { pipeline } from 'node:stream/promises'. It ensures that when one part of your system fails, the rest cleans up gracefully, keeping your heap usage deterministic and your servers alive.