Micro Batch Processing in Python, Java and TS/Node

Micro Batch Processing in Python, Java and TS/Node



Micro-batch processing is the practice of collecting data in small groups (“batches”) for the purposes of taking action on (processing) that data.

Micro-batch processor
Micro-batch processor

Imagine an application that produces many logs within seconds. If the Logs API is called that frequently to store each log, the logging service will be flooded with requests, which can eventually cause the application to crash.

Instead, group logs into batches and make a single call to the Logs API per second. This approach is more sustainable and robust.

Basically, whenever something needs to be processed asap, but also requires efficient management of resources, micro-batching is the way to go.

Next, let's look at some real world examples.

Examples:

Example 1: OpenSearch/LogStash

Logstash works in batches. Some code needs to be executed regardless of how many events are processed at a time within the pipeline worker. Instead of executing that code 100 times for 100 events, it’s more efficient to run it once for a batch of 100 events.

Example 2: opentelemetry

OpenTelemetry code example. Check out this code see some methods in it.

_addToBuffer(span: ReadableSpan) method adds item to a list and at the end it triggers the _maybeStartTimer method. 

if the batch size limit has reached _maybeStartTimer, it flushes. if the limit has not been reached, it schedules the next flush.

Example 3: Apache Spark

This code reads data as a stream from a given path and writes it somewhere else. Think of it as reading from a log file continuously as it gets updated.
spark.readStream
.format("delta")
.load("<delta_table_path>")
.writeStream
.format("delta")
.trigger(processingTime='5 seconds')   # Defines processing interval
.outputMode("append")
.option("checkpointLocation", "<checkpoint_path>")
.options(**writeConfig)
.start()

Apache spark uses micro batching to process data. trigger(processingTime='5 seconds') allow the data to be processed in interval and it is configurable. Instead of reading data every time the content gets updated, it does so only periodically.

Coding Examples

Let’s look at how different programming languages can be used to implement a micro-batch processor.

Coding Example 1 : Python

Python: using threading and time.sleep
import threading
import time
from queue import Queue, Empty

class MicroBatcher:
def __init__(self, process_func, interval=0.5, max_batch_size=100):
self.process_func = process_func # Function to handle the batch
self.interval = interval # Time-based trigger (seconds)
self.max_batch_size = max_batch_size # Size-based trigger
self.queue = Queue()
self.running = True

# Start the background worker thread
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()

def add_item(self, item):
"""Producer calls this to add data to the buffer."""
self.queue.put(item)

def _worker(self):
"""Background loop that monitors the heartbeat."""
while self.running:
time.sleep(self.interval)
self.flush()

def flush(self):
"""Drains the queue and triggers processing."""
batch = []
try:
while len(batch) < self.max_batch_size:
batch.append(self.queue.get_nowait())
except Empty:
pass

if batch:
self.process_func(batch)

def stop(self):
self.running = False
self.flush()

# --- Usage Example ---
def my_database_writer(batch):
print(f"Writing batch of {len(batch)} to DB: {batch}")

batcher = MicroBatcher(my_database_writer, interval=0.5, max_batch_size=5)

Running the code: use python3.14t to use free threading

~/codes/batch-processor/python python3.14t micro_batch.py
Writing batch of 5 to DB: ['event_0', 'event_1', 'event_2', 'event_3', 'event_4'] Writing batch of 5 to DB: ['event_5', 'event_6', 'event_7', 'event_8', 'event_9']

Since python 3.13, Free-threaded execution allows for full utilisation of the available processing power by running threads in parallel on available CPU cores.

Code shows how a tread safe queue(self.queue = Queue()) and a daemon thread(self.worker_thread = threading.Thread(target=self._worker, daemon=True)) can be used to do micro batch processing.

threading.Thread(target=self._worker, daemon=True) creates a daemon thread to process the batches. _worker(self) flushes the queue periodically. flush(self) removes item from queue. flush uses get_nowait to remove items from the queue and it is thread-safe.

What if the batch processing takes longer than the interval ?
if self.flush() takes 2 seconds, the while loop will be blocked for 2 seconds. so the next run will happen after 2.5 seconds only. 


Coding Example 2 : Python sched

we can also use sched to implement the micro-batch processor in python

import threading import time import sched from queue import Queue, Empty class MicroBatcher: def __init__(self, process_func, interval=0.5, max_batch_size=100): self.process_func = process_func self.interval = interval self.max_batch_size = max_batch_size self.queue = Queue() self.running = True # Initialize the scheduler self.scheduler = sched.scheduler(time.time, time.sleep) self.next_run_time = 0 # Start the background worker thread self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() def add_item(self, item): """Producer: Adds data to the queue. Safe across multiple cores.""" self.queue.put(item) def _worker(self): """Consumer: Manages the absolute timing grid.""" self.next_run_time = time.time() + self.interval self.scheduler.enterabs(self.next_run_time, 1, self._scheduled_flush) self.scheduler.run() def _scheduled_flush(self): """The Heartbeat: Ensures Fixed-Rate execution.""" if not self.running: return self.flush() # Calculate the NEXT target time based on the grid self.next_run_time += self.interval # Death Spiral Safety if self.next_run_time < time.time(): print("Warning: Batcher lagging. Resetting schedule.") self.next_run_time = time.time() + self.interval self.scheduler.enterabs(self.next_run_time, 1, self._scheduled_flush) def flush(self): batch = [] try: while len(batch) < self.max_batch_size: batch.append(self.queue.get_nowait()) except Empty: pass if batch: try: self.process_func(batch) except Exception as e: print(f"Error processing batch: {e}") def stop(self): self.running = False for event in self.scheduler.queue: try: self.scheduler.cancel(event) except ValueError: pass self.flush() # --- Usage Example --- def my_database_writer(batch): print(f"[{time.strftime('%H:%M:%S')}] Writing {len(batch)} items: {batch}") if __name__ == "__main__": batcher = MicroBatcher(my_database_writer, interval=0.5, max_batch_size=5) for i in range(12): batcher.add_item(f"event_{i}") time.sleep(0.1) time.sleep(3) batcher.stop()

Code execution output

$ python3.14t micro_batch_sched.py
[11:08:45] Writing 5 items: ['event_0', 'event_1', 'event_2', 'event_3', 'event_4']
[11:08:46] Writing 5 items: ['event_5', 'event_6', 'event_7', 'event_8', 'event_9']
[11:08:46] Writing 2 items: ['event_10', 'event_11']

This code uses a background thread and a scheduler. The moment you start the worker_thread, everything that happens inside _worker is bound to the background thread. Hence, the scheduling is handled by the background thread, and this will not block the program.

After the flush, self.next_run_time += self.interval calculates the time of the next run. if self.next_run_time < time.time() , the next run has already passed, if it is the case, update the self.next_run_time to time.time() + self.interval. 
why update self.next_run_time to time.time() + self.interval ? This logic ensures self.next_run_time is alway a future timestamp. if you pass a past timestamp as self.next_run_time to self.scheduler.enterabs,  _scheduled_flush would get executed multiple time within a short timeframe due to the , which could cause a spikes. 

Check how enterabs work to understand this further.

Using sched to implement this is not a good approach since the code is too complex. Coding Example 1 : Python is much simpler.

Coding Example 3 : Java

using LinkedBlockingQueue and ScheduledExecutorService

import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class MicroBatcher<T> { private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final int maxBatchSize; public MicroBatcher(int maxBatchSize, long intervalMs) { this.maxBatchSize = maxBatchSize; // Background thread triggers every intervalMs scheduler.scheduleAtFixedRate(this::flush, intervalMs, intervalMs, TimeUnit.MILLISECONDS); } public void addItem(T item) { System.out.println("Adding: " + item); queue.add(item); } private synchronized void flush() { if (queue.isEmpty()) return; List<T> batch = new ArrayList<>(); queue.drainTo(batch, maxBatchSize); if (!batch.isEmpty()) { processBatch(batch); } } private void processBatch(List<T> batch) { System.out.println("BATCH PROCESSED (" + batch.size() + " items): " + batch); } public void shutdown() { scheduler.shutdown(); flush(); } public static void main(String[] args) throws InterruptedException { MicroBatcher<String> batcher = new MicroBatcher<>(5, 500); for (int i = 0; i < 12; i++) { batcher.addItem("event_" + i); Thread.sleep(100); } Thread.sleep(1000); batcher.shutdown(); } }

Executors.newSingleThreadScheduledExecutor() creates a background thread to process incoming messages. It calls flush() periodically.
The flush() method is synchronised to avoid race conditions if another thread tries to flush while the scheduler is already doing so.
queue.drainTo(batch, maxBatchSize) drains items from the queue and puts them into a list. While this is happening, other threads can still add items to the queue.

The Java implementation is straightforward and easy to understand compared to Python.

Coding Example 4 : Typescript / Node.js

This implementation is quite different from Python and Java. Since TypeScript/JavaScript is single-threaded, we don’t need to worry about race conditions or multiple threads accessing the same resource at the same time.

class MicroBatcher<T> { private queue: T[] = []; private intervalMs: number; private maxBatchSize: number; private processFunc: (batch: T[]) => Promise<void>; private nextRunTime: number = 0; private isRunning: boolean = false; private timerRef: NodeJS.Timeout | null = null; constructor( processFunc: (batch: T[]) => Promise<void>, intervalMs: number, maxBatchSize: number, ) { this.processFunc = processFunc; this.intervalMs = intervalMs; this.maxBatchSize = maxBatchSize; } public start(): void { if (this.isRunning) return; this.isRunning = true; this.nextRunTime = Date.now() + this.intervalMs; this.scheduleNext(); } public async stop(): Promise<void> { if (!this.isRunning) return; this.isRunning = false; if (this.timerRef) { clearTimeout(this.timerRef); this.timerRef = null; } // Drain the remaining queue before fully stopping while (this.queue.length > 0) { await this.flush(); } } public addItem(item: T): void { this.queue.push(item); } private async heartbeat(): Promise<void> { if (!this.isRunning) return; await this.flush(); this.nextRunTime += this.intervalMs; const now = Date.now(); if (this.nextRunTime < now) { this.nextRunTime = now; } this.scheduleNext(); } private scheduleNext(): void { const delay = Math.max(0, this.nextRunTime - Date.now()); this.timerRef = setTimeout(() => this.heartbeat(), delay); } private async flush(): Promise<void> { if (this.queue.length === 0) return; // Atomic splice in the JS event loop const batch = this.queue.splice(0, this.maxBatchSize); try { await this.processFunc(batch); } catch (err) { console.error(err); } } } // --- Example Usage --- async function runExample() { const dbWriter = async (batch: string[]) => { console.log(`Processing: ${batch.join(", ")}`); await new Promise((resolve) => setTimeout(resolve, 150)); }; const batcher = new MicroBatcher(dbWriter, 1000, 5); batcher.start(); for (let i = 1; i <= 12; i++) { batcher.addItem(`data_${i}`); await new Promise((r) => setTimeout(r, 100)); } await batcher.stop(); } runExample();

Code execution output
$ node batch_processor.js
Processing: data_1, data_2, data_3, data_4, data_5
Processing: data_6, data_7, data_8, data_9, data_10
Processing: data_11, data_12

In this approach, we use setTimeout to flush periodically. queue.splice is a CPU-bound operation, and while it is executing, no other operations can run, which makes splice effectively thread-safe.

The scheduleNext() method checks whether the next scheduled runtime has already passed. If it has, the method schedules the next execution immediately; otherwise, it waits until the scheduled time.

Conclusion

We explored real-world use cases and how different programming languages use different approaches to handle micro-batches.

In my opinion, the Java code is clear, easy to understand, and has better multi-threaded support, which is good for scalability.



References:
  • https://hazelcast.com/foundations/software-architecture/micro-batch-processing/
  • https://spark.apache.org/streaming/
  • https://docs.opensearch.org/latest/tools/logstash/execution-model/
  • https://docs.python.org/3/library/sched.html
  • https://docs.python.org/3/howto/free-threading-python.html
  • https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html

Comments