1. Background and Motivation
Although Google Guava’s EventBus
is already quite convenient, I wanted to build something more extensible — a component that not only functions as an EventBus but also supports delayed events out of the box.
While exploring the Apache DolphinScheduler project, I found its built-in eventbus
component to be a well-written and efficient solution. It inspired me to extract and adapt it for use in our own business systems.
Thanks to DolphinScheduler’s open-source nature, this component can now be conveniently reused with minimal overhead. Let’s walk through how to extract it and run a working demo.
2. Implementation Details
Step 1: Define the Event Interface
First, we define a base interface for all events:
public interface IEvent {
}
Step 2: Abstract Class for Delayed Events
To support delayed execution, we introduce an abstract class AbstractDelayEvent
that extends Delayed
and implements IEvent
. It includes both the delay time and the expiration time.
public abstract class AbstractDelayEvent implements IEvent, Delayed {
private final long delayTime;
private final long expireTime;
public long getDelayTime() {
return delayTime;
}
public long getExpireTime() {
return expireTime;
}
public AbstractDelayEvent(long delayTime) {
this.delayTime = delayTime;
this.expireTime = System.currentTimeMillis() + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = expireTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.expireTime < ((AbstractDelayEvent) o).expireTime) {
return -1;
}
if (this.expireTime > ((AbstractDelayEvent) o).expireTime) {
return 1;
}
return 0;
}
}
Step 3: Define the EventBus Interface
Next, we define the interface for the EventBus, with core operations like publish, poll, peek, remove, and state checks:
public interface IEventBus<T extends IEvent> {
void publish(T event);
Optional<T> poll() throws InterruptedException;
Optional<T> peek();
Optional<T> remove();
boolean isEmpty();
int size();
}
Step 4: Abstract Delay EventBus Implementation
We then provide an abstract base class AbstractDelayEventBus
, which implements the core logic using Java’s DelayQueue
.
public abstract class AbstractDelayEventBus<T extends AbstractDelayEvent> implements IEventBus<T> {
protected final DelayQueue<T> delayEventQueue = new DelayQueue<>();
@Override
public void publish(T event) {
delayEventQueue.put(event);
}
@Override
public Optional<T> poll() throws InterruptedException {
return Optional.ofNullable(delayEventQueue.poll(1000, TimeUnit.MILLISECONDS));
}
@Override
public Optional<T> peek() {
return Optional.ofNullable(delayEventQueue.peek());
}
@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.poll());
}
@Override
public boolean isEmpty() {
return delayEventQueue.isEmpty();
}
@Override
public int size() {
return delayEventQueue.size();
}
}
3. Demo: Testing the Component
Let’s build a small example to see it in action.
Define a Custom Delayed Event
In a real system, this would hold your domain-specific business data.
public class MyDelayEvent extends AbstractDelayEvent {
private final String message;
public MyDelayEvent(long delayTime, String message) {
super(delayTime);
this.message = message;
}
public String getMessage() {
return message;
}
}
Implement Your Own EventBus (Optional)
You can customize it further if needed. For simple use cases, this works as-is.
public class MyDelayEventBus extends AbstractDelayEventBus<MyDelayEvent> {
// No additional customization needed
}
Example: Publishing and Consuming Events
Now, let’s run a simple main function that publishes and consumes delayed events:
import java.util.Optional;
public class EventBusExample {
public static void main(String[] args) throws InterruptedException {
// Create event bus
IEventBus<MyDelayEvent> eventBus = new MyDelayEventBus();
// Publish a single event with 100ms delay
eventBus.publish(new MyDelayEvent(100, "Single Event"));
System.out.println("After publish, event bus size: " + eventBus.size());
// Continuously try to consume events
while (true) {
Optional<MyDelayEvent> event = eventBus.poll();
if (event.isPresent()) {
System.out.println("Received event: " + event.get().getMessage());
} else {
System.out.println("No event received within the timeout.");
break;
}
}
// Final bus size
System.out.println("Event bus size: " + eventBus.size());
}
}
Output
As you can see, this approach makes it easy to create and manage your own delay-capable EventBus system, enabling flexible event-driven programming for your business applications.
4. Source Code & References
- 🔗 Source Code: Gitee Repository
- 📚 Reference: Apache DolphinScheduler GitHub
If you're building delay-sensitive or event-driven systems, this lightweight and extendable component might be just what you need — and it’s inspired by one of the best open-source workflow schedulers out there.
Top comments (0)