Description
N8N Kafka Batch Consumer Node
A custom N8N node for consuming Kafka messages in batches using KafkaJS.
Features
- Batch Message Consumption: Collect a configurable number of messages before processing
- Flexible Authentication: Support for SASL (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512) and SSL/TLS
- Comprehensive Error Handling: Graceful error handling with proper resource cleanup
- JSON Parsing: Automatic JSON parsing with fallback to string
- Timeout Management: Configurable read timeout with partial batch support
- N8N Integration: Standard N8N node with credential support
Installation
npm install
npm run build
Configuration Parameters
Required Parameters
localhost:9092)Optional Parameters
false)30000)Options
60000)true)Understanding Timeouts
The node uses two different timeout configurations that serve distinct purposes:
Session Timeout (Kafka/Broker-side)
– The consumer must send heartbeats to the broker within this time
– If the broker doesn’t receive heartbeats for sessionTimeout milliseconds, it considers the consumer “dead” and triggers a rebalancing (reassigning partitions to other consumers in the group)
Read Timeout (Application-side)
– If batchSize messages arrive before the timeout → returns immediately
– If the timeout expires first → returns collected messages (partial batch)
Why Both Are Needed
Best Practice: Keep Session Timeout ≥ Read Timeout to avoid broker disconnections while waiting for messages. However, KafkaJS sends heartbeats automatically in the background, so the consumer stays alive even during longer Read Timeouts.
Credentials
The node supports optional Kafka credentials with the following features:
SASL Authentication
SSL/TLS Configuration
Usage Example
1. Add the “Kafka Batch Consumer” node to your workflow
2. Configure the broker addresses and topic
3. Set the desired batch size
4. Optionally configure credentials for authentication
5. Run the workflow to consume messages
Output Format
Each message is returned as an INodeExecutionData object with the following structure:
{
json: {
topic: string,
partition: number,
offset: string,
key: string | null,
value: any,
timestamp: string,
headers: Record
}
}
Testing
The project includes comprehensive Jest tests covering:
Run tests:
npm test
Run tests with coverage:
npm run test:coverage
Coverage target: 80% minimum
Development
Build
npm run build
Lint
npm run lint
Test
npm test
License
MIT