In the previous post - Why Functional Programming Matters, we explored the design of a high-throughput timed buffered reader in Scala using cats-effect. Such a reader is helpful for event buffering and micro-batching, a common scenario in large-scale systems where you want to reduce the overhead of processing individual events.
The intent of the post was to highlight how the application of functional programming principles made the code succinct and robust.
I had to implement the same timed buffered reader in Java. In this post, I will explore how I implemented it. I don’t know of any library like cats-effect in Java. So, I had to implement the timed buffered reader from scratch, dealing with some low-level concurrency primitives, which you don’t have to do when using cats-effect. The code is not elegant and is verbose compared to the Scala version. In reality, it turned out to be not too bad either.
To implement it in Java, first, we need a scheduler that will wake up every flushIntervalMs
and dispatch the items in the buffer to the callback. So, we create one:
final var scheduler = Executors.newSingleThreadScheduledExecutor();
The scheduler needs a Runnable
task to execute periodically. We define a Runnable
task that will flush the buffer:
scheduler.scheduleAtFixedRate(
this::flushSyncSafe,
flushIntervalMs,
flushIntervalMs,
TimeUnit.MILLISECONDS
);
void flushSyncSafe() {
try {
flush().get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.err.println("Error flushing buffer " + e.getMessage());
Thread.currentThread().interrupt();
}
}
Following is the implementation of the flush
method. We used a isFlushing
flag to ensure we avoid double-flushing.
private CompletableFuture<Void> flush() {
if (!isFlushing.compareAndSet(false, true)) {
return CompletableFuture.completedFuture(null);
}
try {
final var batch = new HashSet<T>();
while (batch.size() < maxBatchSize) {
final var msg = buffer.poll();
if (msg == null) break;
batch.add(msg);
}
if (!batch.isEmpty()) {
return callback.apply(batch);
}
} finally {
isFlushing.set(false);
}
return CompletableFuture.completedFuture(null);
}
The publish
and close
methods are the important ones that act in tandem via a lock so that we don’t encourage publishing after closing.
public CompletableFuture<Void> publish(T item) {
lock.lock();
try {
if (shutdownInitiated) {
System.err.println("Shutting down! Publish discarded for item: " + item);
return CompletableFuture.failedFuture(new IllegalStateException("Shutting down!"));
}
buffer.add(item);
return buffer.size() >= maxBatchSize ? flush() : CompletableFuture.completedFuture(null);
} finally {
lock.unlock();
}
}
@Override
public void close() {
lock.lock();
try {
shutdownInitiated = true;
System.err.println("TimedBufferedReader shutdown initiated ...");
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(flushIntervalMs, TimeUnit.MILLISECONDS)) {
System.err.println("Failed to perform final flush in time.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Final flush to ensure all items are processed
flushSyncSafe();
while (!buffer.isEmpty()) {
flushSyncSafe();
}
} finally {
lock.unlock();
}
}
Here is the unabridged implementation:
import java.io.Closeable;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.concurrent.locks.ReentrantLock;
class TimedBufferedReader<T> implements Closeable {
private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Function<Set<T>, CompletableFuture<Void>> callback;
private final AtomicBoolean isFlushing = new AtomicBoolean(false);
private final int maxBatchSize;
private final long flushIntervalMs;
private volatile boolean shutdownInitiated;
private final ReentrantLock lock = new ReentrantLock();
public TimedBufferedReader(
int maxBatchSize,
long flushIntervalMs,
Function<Set<T>, CompletableFuture<Void>> callback
) {
// validate parameters
this.maxBatchSize = maxBatchSize;
this.flushIntervalMs = flushIntervalMs;
this.callback = callback;
final var unused =
scheduler.scheduleAtFixedRate(
this::flushSyncSafe,
flushIntervalMs,
flushIntervalMs,
TimeUnit.MILLISECONDS
);
}
public CompletableFuture<Void> publish(T item) {
lock.lock();
try {
if (shutdownInitiated) {
System.err.println("Shutting down! Publish discarded for item: " + item);
return CompletableFuture.failedFuture(new IllegalStateException("Shutting down!"));
}
buffer.add(item);
return buffer.size() >= maxBatchSize ? flush() : CompletableFuture.completedFuture(null);
} finally {
lock.unlock();
}
}
private CompletableFuture<Void> flush() {
if (!isFlushing.compareAndSet(false, true)) {
return CompletableFuture.completedFuture(null);
}
try {
final var batch = new HashSet<T>();
while (batch.size() < maxBatchSize) {
final var msg = buffer.poll();
if (msg == null) break;
batch.add(msg);
}
if (!batch.isEmpty()) {
return callback.apply(batch);
}
} finally {
isFlushing.set(false);
}
return CompletableFuture.completedFuture(null);
}
@Override
public void close() {
lock.lock();
try {
shutdownInitiated = true;
System.err.println("TimedBufferedReader shutdown initiated ...");
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(flushIntervalMs, TimeUnit.MILLISECONDS)) {
System.err.println("Failed to perform final flush in time.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Final flush to ensure all items are processed
flushSyncSafe();
while (!buffer.isEmpty()) {
flushSyncSafe();
}
} finally {
lock.unlock();
}
}
private void flushSyncSafe() {
try {
flush().get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.err.println("Error flushing buffer " + e.getMessage());
Thread.currentThread().interrupt();
}
}
}
Photo Courtesy: Geordanna Cordero