The error is all too familiar. Your ETL script runs perfectly on your local machine with a 50MB sample file. You deploy it to production to process a 10GB CSV dump, and 30 seconds later, the process dies:
FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
This is not a memory leak in the traditional sense. It is a flow control failure. Specifically, your application is reading data from the source faster than it can write it to the destination.
The Root Cause: Unbounded Buffering
In Node.js, streams are designed to transport data. However, data takes up memory while it is being transported.
When you read from a fast source (like fs.createReadStream reading from an SSD) and write to a slow destination (like a database insert over the network or fs.createWriteStream to a slow disk), a bottleneck forms.
Internally, Writable streams have a highWaterMark property (defaulting to 16KB). This is the threshold where the stream signals it is "full." When you call stream.write(chunk), the method returns a boolean:
true: The internal buffer is below the limit. Keep sending.false: The buffer is full. Stop sending until the buffer drains.
The Crash Mechanism: If your code ignores this false return value—common in naive event-listener patterns (data.on('data', chunk => target.write(chunk)))—Node.js will dutifully keep buffering incoming chunks in RAM. The read stream pushes data onto the heap faster than the garbage collector can free the processed chunks. Eventually, the V8 heap limit is breached, and the process crashes.
This mechanism of signaling the source to pause is called Backpressure.
The Fix: Modern Async Iterators and Pipelines
The historical solution was to listen for the drain event manually. This resulted in complex, brittle code.
The modern, production-grade solution utilizes stream.pipeline combined with Async Generators. This approach abstracts the backpressure management entirely, allowing Node.js to automatically pause the reading stream when the writing stream is saturated.
Below is a complete implementation of a robust CSV processor that operates with constant memory usage, regardless of input size.
The Implementation
We will create a script that reads a large dataset, parses it, transforms it (anonymizing email addresses), and writes it to a file. We simulate a "slow" consumer to demonstrate backpressure stability.
Prerequisites: Node.js v18+. Filename: etl-pipeline.mjs
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { createInterface } from 'node:readline';
// 1. CONSTANTS
const INPUT_FILE = './large-dataset.csv'; // Assume this is 10GB
const OUTPUT_FILE = './clean-dataset.jsonl';
const SIMULATED_DB_DELAY_MS = 5; // Simulate slow I/O
/**
* 2. THE TRANSFORMER
* An Async Generator acts as the processing logic.
* It yields data only when the consumer is ready.
*/
async function* processLine(sourceStream) {
// readline creates an async iterator over line breaks
const lines = createInterface({
input: sourceStream,
crlfDelay: Infinity,
});
let isHeader = true;
let headers = [];
for await (const line of lines) {
// Skip empty lines
if (!line.trim()) continue;
const columns = line.split(',');
// Handle CSV Header
if (isHeader) {
headers = columns.map(h => h.trim());
isHeader = false;
continue;
}
// Map columns to object
const row = {};
headers.forEach((header, index) => {
row[header] = columns[index]?.trim();
});
// 3. BUSINESS LOGIC (Transformation)
// Example: Anonymize email if present
if (row.email) {
const [user, domain] = row.email.split('@');
row.email = `${user[0]}***@${domain}`;
}
// Convert to JSONL string
const outputChunk = JSON.stringify(row) + '\n';
yield outputChunk;
}
}
/**
* 4. THE SLOW CONSUMER (Simulation)
* A custom Write stream that artificially slows down writing
* to force backpressure to propagate up the chain.
*/
const createSlowWriteStream = (filepath) => {
const fileStream = createWriteStream(filepath);
return new Transform({
async transform(chunk, encoding, callback) {
// Simulate network/DB latency
await new Promise(resolve => setTimeout(resolve, SIMULATED_DB_DELAY_MS));
// Push data to the actual file
const canPush = fileStream.write(chunk);
if (!canPush) {
// If file buffer is full, wait for drain
fileStream.once('drain', callback);
} else {
callback();
}
},
flush(callback) {
fileStream.end(callback);
}
});
};
// 5. THE PIPELINE EXECUTION
async function runETL() {
console.time('ETL Duration');
console.log(`Starting ETL process. PID: ${process.pid}`);
try {
// Check initial memory usage
const initialMem = process.memoryUsage().heapUsed / 1024 / 1024;
console.log(`Initial Heap: ${initialMem.toFixed(2)} MB`);
const source = createReadStream(INPUT_FILE, { encoding: 'utf8' });
const destination = createSlowWriteStream(OUTPUT_FILE);
// pipeline handles error propagation and stream cleanup automatically.
// Ideally, pass the generator function execution here.
await pipeline(
// Source is passed implicitly if we structured differently,
// but here we pass the source into the generator manually
// so the pipeline sees the generator as the "Read" stream.
processLine(source),
destination
);
console.log('Pipeline succeeded.');
} catch (err) {
console.error('Pipeline failed:', err);
process.exit(1);
} finally {
const finalMem = process.memoryUsage().heapUsed / 1024 / 1024;
console.log(`Final Heap: ${finalMem.toFixed(2)} MB`);
console.timeEnd('ETL Duration');
}
}
runETL();
Why This Works
The magic lies in the interaction between stream.pipeline, for await, and yield.
Iterators Pull, They Don't Push: Unlike the traditional
EventEmittermodel where the read stream emits data as fast as possible ("Push"), Async Iterators function on a "Pull" model. The loopfor await (const line of lines)only requests the next chunk of data when the previous iteration completes.Pipeline Management: The
pipelineutility manages the flow. When thedestinationstream (our simulated slow writer) has its internal buffer filled, it signals the pipeline to stop pulling data from theprocessLineiterator.Propagation: Because
processLinestops execution at theyieldstatement, it stops iterating overlines. Consequently, thereadlineinterface stops reading from thecreateReadStream. The OS buffer for the file fills up, and the operating system pauses the disk read operation.
The result is a system where the speed is dictated entirely by the slowest link in the chain (the writer), not the fastest.
Conclusion
Memory crashes in Node.js data processing are rarely due to the sheer volume of data, but rather the rate at which that data is loaded into memory.
By abandoning legacy data event listeners and adopting node:stream/promises with Async Generators, you opt-in to automatic backpressure management. This ensures your application remains stable with 10MB or 10TB of data, maintaining a flat memory profile throughout the process.