Micro-batch processing is the practice of collecting data in small groups (“batches”) for the purposes of taking action on (processing) that data.
 |
Micro-batch processor
|
Imagine an application that produces many logs within seconds. If the Logs API is called that frequently to store each log, the logging service will be flooded with requests, which can eventually cause the application to crash.
Instead, group logs into batches and make a single call to the Logs API per second. This approach is more sustainable and robust.
Basically, whenever something needs to be processed asap, but also requires efficient management of resources, micro-batching is the way to go.
Next, let's look at some real world examples.
Examples:
Example 1: OpenSearch/LogStash
Logstash works in batches. Some code needs to be executed regardless of how many events are processed at a time within the pipeline worker. Instead of executing that code 100 times for 100 events, it’s more efficient to run it once for a batch of 100 events.
Example 2: opentelemetry
OpenTelemetry code example. Check out this code see some methods in it.
_addToBuffer(span: ReadableSpan) method adds item to a list and at the end it triggers the _maybeStartTimer method.
if the batch size limit has reached _maybeStartTimer, it flushes. if the limit has not been reached, it schedules the next flush.
Example 3: Apache Spark
This code reads data as a stream from a given path and writes it somewhere else. Think of it as reading from a log file continuously as it gets updated.
spark.readStream
.format("delta")
.load("<delta_table_path>")
.writeStream
.format("delta")
.trigger(processingTime='5 seconds')
# Defines processing interval
.outputMode("append")
.option("checkpointLocation", "<checkpoint_path>")
.options(**writeConfig)
.start()
Apache spark uses micro batching to process data. trigger(processingTime='5 seconds') allow the data to be processed in interval and it is configurable. Instead of reading data every time the content gets updated, it does so only periodically.
Coding Examples
Let’s look at how different programming languages can be used to implement a micro-batch processor.
Coding Example 1 : Python
Python: using threading and time.sleep
import threading
import time
from queue import Queue, Empty
class MicroBatcher:
def __init__(self, process_func, interval=0.5, max_batch_size=100):
self.process_func = process_func # Function to handle the batch
self.interval = interval # Time-based trigger (seconds)
self.max_batch_size = max_batch_size # Size-based trigger
self.queue = Queue()
self.running = True
# Start the background worker thread
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
def add_item(self, item):
"""Producer calls this to add data to the buffer."""
self.queue.put(item)
def _worker(self):
"""Background loop that monitors the heartbeat."""
while self.running:
time.sleep(self.interval)
self.flush()
def flush(self):
"""Drains the queue and triggers processing."""
batch = []
try:
while len(batch) < self.max_batch_size:
batch.append(self.queue.get_nowait())
except Empty:
pass
if batch:
self.process_func(batch)
def stop(self):
self.running = False
self.flush()
# --- Usage Example ---
def my_database_writer(batch):
print(f"Writing batch of {len(batch)} to DB: {batch}")
batcher = MicroBatcher(my_database_writer, interval=0.5, max_batch_size=5)
Running the code: use python3.14t to use free threading
~/codes/batch-processor/python
python3.14t micro_batch.py
Writing batch of 5 to DB: ['event_0', 'event_1', 'event_2', 'event_3', 'event_4']
Writing batch of 5 to DB: ['event_5', 'event_6', 'event_7', 'event_8', 'event_9']
Since python 3.13, Free-threaded execution allows for full utilisation of the available processing power by running threads in parallel on available CPU cores.
Code shows how a tread safe queue(self.queue = Queue()) and a daemon thread(self.worker_thread = threading.Thread(target=self._worker, daemon=True)) can be used to do micro batch processing.
threading.Thread(target=self._worker, daemon=True) creates a daemon thread to process the batches. _worker(self) flushes the queue periodically. flush(self) removes item from queue. flush uses get_nowait to remove items from the queue and it is thread-safe.
What if the batch processing takes longer than the interval ?
if self.flush() takes 2 seconds, the while loop will be blocked for 2 seconds. so the next run will happen after 2.5 seconds only.
Coding Example 2 : Python sched
we can also use
sched to implement the micro-batch processor in python
import threading
import time
import sched
from queue import Queue, Empty
class MicroBatcher:
def __init__(self, process_func, interval=0.5, max_batch_size=100):
self.process_func = process_func
self.interval = interval
self.max_batch_size = max_batch_size
self.queue = Queue()
self.running = True
# Initialize the scheduler
self.scheduler = sched.scheduler(time.time, time.sleep)
self.next_run_time = 0
# Start the background worker thread
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
def add_item(self, item):
"""Producer: Adds data to the queue. Safe across multiple cores."""
self.queue.put(item)
def _worker(self):
"""Consumer: Manages the absolute timing grid."""
self.next_run_time = time.time() + self.interval
self.scheduler.enterabs(self.next_run_time, 1, self._scheduled_flush)
self.scheduler.run()
def _scheduled_flush(self):
"""The Heartbeat: Ensures Fixed-Rate execution."""
if not self.running:
return
self.flush()
# Calculate the NEXT target time based on the grid
self.next_run_time += self.interval
# Death Spiral Safety
if self.next_run_time < time.time():
print("Warning: Batcher lagging. Resetting schedule.")
self.next_run_time = time.time() + self.interval
self.scheduler.enterabs(self.next_run_time, 1, self._scheduled_flush)
def flush(self):
batch = []
try:
while len(batch) < self.max_batch_size:
batch.append(self.queue.get_nowait())
except Empty:
pass
if batch:
try:
self.process_func(batch)
except Exception as e:
print(f"Error processing batch: {e}")
def stop(self):
self.running = False
for event in self.scheduler.queue:
try:
self.scheduler.cancel(event)
except ValueError:
pass
self.flush()
# --- Usage Example ---
def my_database_writer(batch):
print(f"[{time.strftime('%H:%M:%S')}] Writing {len(batch)} items: {batch}")
if __name__ == "__main__":
batcher = MicroBatcher(my_database_writer, interval=0.5, max_batch_size=5)
for i in range(12):
batcher.add_item(f"event_{i}")
time.sleep(0.1)
time.sleep(3)
batcher.stop()
Code execution output
$
python3.14t
micro_batch_sched.py
[11:08:45] Writing 5 items: ['event_0', 'event_1', 'event_2', 'event_3', 'event_4']
[11:08:46] Writing 5 items: ['event_5', 'event_6', 'event_7', 'event_8', 'event_9']
[11:08:46] Writing 2 items: ['event_10', 'event_11']
This code uses a background thread and a scheduler. The moment you start the worker_thread, everything that happens inside _worker is bound to the background thread. Hence, the scheduling is handled by the background thread, and this will not block the program.
After the flush, self.next_run_time += self.interval calculates the time of the next run. if self.next_run_time < time.time() , the next run has already passed, if it is the case, update the self.next_run_time to time.time() + self.interval.
why update self.next_run_time to time.time() + self.interval ? This logic ensures self.next_run_time is alway a future timestamp. if you pass a past timestamp as self.next_run_time to self.scheduler.enterabs, _scheduled_flush would get executed multiple time within a short timeframe due to the , which could cause a spikes.
Check how enterabs work to understand this further.
Using sched to implement this is not a good approach since the code is too complex. Coding Example 1 : Python is much simpler. Coding Example 3 : Java
using LinkedBlockingQueue and ScheduledExecutorService
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class MicroBatcher<T> {
private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final int maxBatchSize;
public MicroBatcher(int maxBatchSize, long intervalMs) {
this.maxBatchSize = maxBatchSize;
// Background thread triggers every intervalMs
scheduler.scheduleAtFixedRate(this::flush, intervalMs, intervalMs, TimeUnit.MILLISECONDS);
}
public void addItem(T item) {
System.out.println("Adding: " + item);
queue.add(item);
}
private synchronized void flush() {
if (queue.isEmpty()) return;
List<T> batch = new ArrayList<>();
queue.drainTo(batch, maxBatchSize);
if (!batch.isEmpty()) {
processBatch(batch);
}
}
private void processBatch(List<T> batch) {
System.out.println("BATCH PROCESSED (" + batch.size() + " items): " + batch);
}
public void shutdown() {
scheduler.shutdown();
flush();
}
public static void main(String[] args) throws InterruptedException {
MicroBatcher<String> batcher = new MicroBatcher<>(5, 500);
for (int i = 0; i < 12; i++) {
batcher.addItem("event_" + i);
Thread.sleep(100);
}
Thread.sleep(1000);
batcher.shutdown();
}
}
Executors.newSingleThreadScheduledExecutor() creates a background thread to process incoming messages. It calls flush() periodically.
The flush() method is synchronised to avoid race conditions if another thread tries to flush while the scheduler is already doing so.
queue.drainTo(batch, maxBatchSize) drains items from the queue and puts them into a list. While this is happening, other threads can still add items to the queue.
The Java implementation is straightforward and easy to understand compared to Python.
Coding Example 4 : Typescript / Node.js
This implementation is quite different from Python and Java. Since TypeScript/JavaScript is single-threaded, we don’t need to worry about race conditions or multiple threads accessing the same resource at the same time.
class MicroBatcher<T> {
private queue: T[] = [];
private intervalMs: number;
private maxBatchSize: number;
private processFunc: (batch: T[]) => Promise<void>;
private nextRunTime: number = 0;
private isRunning: boolean = false;
private timerRef: NodeJS.Timeout | null = null;
constructor(
processFunc: (batch: T[]) => Promise<void>,
intervalMs: number,
maxBatchSize: number,
) {
this.processFunc = processFunc;
this.intervalMs = intervalMs;
this.maxBatchSize = maxBatchSize;
}
public start(): void {
if (this.isRunning) return;
this.isRunning = true;
this.nextRunTime = Date.now() + this.intervalMs;
this.scheduleNext();
}
public async stop(): Promise<void> {
if (!this.isRunning) return;
this.isRunning = false;
if (this.timerRef) {
clearTimeout(this.timerRef);
this.timerRef = null;
}
// Drain the remaining queue before fully stopping
while (this.queue.length > 0) {
await this.flush();
}
}
public addItem(item: T): void {
this.queue.push(item);
}
private async heartbeat(): Promise<void> {
if (!this.isRunning) return;
await this.flush();
this.nextRunTime += this.intervalMs;
const now = Date.now();
if (this.nextRunTime < now) {
this.nextRunTime = now;
}
this.scheduleNext();
}
private scheduleNext(): void {
const delay = Math.max(0, this.nextRunTime - Date.now());
this.timerRef = setTimeout(() => this.heartbeat(), delay);
}
private async flush(): Promise<void> {
if (this.queue.length === 0) return;
// Atomic splice in the JS event loop
const batch = this.queue.splice(0, this.maxBatchSize);
try {
await this.processFunc(batch);
} catch (err) {
console.error(err);
}
}
}
// --- Example Usage ---
async function runExample() {
const dbWriter = async (batch: string[]) => {
console.log(`Processing: ${batch.join(", ")}`);
await new Promise((resolve) => setTimeout(resolve, 150));
};
const batcher = new MicroBatcher(dbWriter, 1000, 5);
batcher.start();
for (let i = 1; i <= 12; i++) {
batcher.addItem(`data_${i}`);
await new Promise((r) => setTimeout(r, 100));
}
await batcher.stop();
}
runExample();
Code execution output
$ node batch_processor.js
Processing: data_1, data_2, data_3, data_4, data_5
Processing: data_6, data_7, data_8, data_9, data_10
Processing: data_11, data_12
In this approach, we use setTimeout to flush periodically. queue.splice is a CPU-bound operation, and while it is executing, no other operations can run, which makes splice effectively thread-safe.
The scheduleNext() method checks whether the next scheduled runtime has already passed. If it has, the method schedules the next execution immediately; otherwise, it waits until the scheduled time.
Conclusion
We explored real-world use cases and how different programming languages use different approaches to handle micro-batches.
In my opinion, the Java code is clear, easy to understand, and has better multi-threaded support, which is good for scalability.
References:
- https://hazelcast.com/foundations/software-architecture/micro-batch-processing/
- https://spark.apache.org/streaming/
- https://docs.opensearch.org/latest/tools/logstash/execution-model/
- https://docs.python.org/3/library/sched.html
- https://docs.python.org/3/howto/free-threading-python.html
- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html
Comments
Post a Comment