diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index 7e29223461f..44d85f5ffc0 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -18,11 +18,7 @@ package org.apache.hadoop.hdds.server.events; import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; - -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +42,6 @@ public class EventQueue implements EventPublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventQueue.class); - private static final String EXECUTOR_NAME_SEPARATOR = "For"; - private final Map>> executors = new HashMap<>(); @@ -57,73 +51,37 @@ public class EventQueue implements EventPublisher, AutoCloseable { public > void addHandler( EVENT_TYPE event, EventHandler handler) { - this.addHandler(event, handler, generateHandlerName(handler)); + + this.addHandler(event, new SingleThreadExecutor<>( + event.getName()), handler); } - /** - * Add new handler to the event queue. - *

- * By default a separated single thread executor will be dedicated to - * deliver the events to the registered event handler. - * - * @param event Triggering event. - * @param handler Handler of event (will be called from a separated - * thread) - * @param handlerName The name of handler (should be unique together with - * the event name) - * @param The type of the event payload. - * @param The type of the event identifier. - */ public > void addHandler( - EVENT_TYPE event, EventHandler handler, String handlerName) { - validateEvent(event); - Preconditions.checkNotNull(handler, "Handler name should not be null."); - String executorName = - StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR - + handlerName; - this.addHandler(event, new SingleThreadExecutor<>(executorName), handler); - } - - private > void validateEvent(EVENT_TYPE event) { - Preconditions - .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR), - "Event name should not contain " + EXECUTOR_NAME_SEPARATOR - + " string."); - - } - - private String generateHandlerName(EventHandler handler) { - if (!"".equals(handler.getClass().getSimpleName())) { - return handler.getClass().getSimpleName(); - } else { - return handler.getClass().getName(); - } - } - - /** - * Add event handler with custom executor. - * - * @param event Triggering event. - * @param executor The executor imlementation to deliver events from a - * separated threads. Please keep in your mind that - * registering metrics is the responsibility of the - * caller. - * @param handler Handler of event (will be called from a separated - * thread) - * @param The type of the event payload. - * @param The type of the event identifier. - */ - public > void addHandler( - EVENT_TYPE event, EventExecutor executor, + EVENT_TYPE event, + EventExecutor executor, EventHandler handler) { - validateEvent(event); + executors.putIfAbsent(event, new HashMap<>()); executors.get(event).putIfAbsent(executor, new ArrayList<>()); - executors.get(event).get(executor).add(handler); + executors.get(event) + .get(executor) + .add(handler); } + /** + * Creates one executor with multiple event handlers. + */ + public void addHandlerGroup(String name, HandlerForEvent... + eventsAndHandlers) { + SingleThreadExecutor sharedExecutor = + new SingleThreadExecutor(name); + for (HandlerForEvent handlerForEvent : eventsAndHandlers) { + addHandler(handlerForEvent.event, sharedExecutor, + handlerForEvent.handler); + } + } /** * Route an event with payload to the right listener(s). @@ -225,5 +183,31 @@ public class EventQueue implements EventPublisher, AutoCloseable { }); } + /** + * Event identifier together with the handler. + * + * @param + */ + public static class HandlerForEvent { + + private final Event event; + + private final EventHandler handler; + + public HandlerForEvent( + Event event, + EventHandler handler) { + this.event = event; + this.handler = handler; + } + + public Event getEvent() { + return event; + } + + public EventHandler getHandler() { + return handler; + } + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java index 3253f2d5db2..a64e3d761dd 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -23,18 +23,13 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import java.util.concurrent.atomic.AtomicLong; /** * Simple EventExecutor to call all the event handler one-by-one. * * @param */ -@Metrics(context = "EventQueue") public class SingleThreadExecutor implements EventExecutor { public static final String THREAD_NAME_PREFIX = "EventQueue"; @@ -46,24 +41,14 @@ public class SingleThreadExecutor implements EventExecutor { private final ThreadPoolExecutor executor; - @Metric - private MutableCounterLong queued; + private final AtomicLong queuedCount = new AtomicLong(0); - @Metric - private MutableCounterLong done; + private final AtomicLong successfulCount = new AtomicLong(0); - @Metric - private MutableCounterLong failed; + private final AtomicLong failedCount = new AtomicLong(0); - /** - * Create SingleThreadExecutor. - * - * @param name Unique name used in monitoring and metrics. - */ public SingleThreadExecutor(String name) { this.name = name; - DefaultMetricsSystem.instance() - .register("EventQueue" + name, "Event Executor metrics ", this); LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); executor = @@ -79,31 +64,31 @@ public class SingleThreadExecutor implements EventExecutor { @Override public void onMessage(EventHandler handler, T message, EventPublisher publisher) { - queued.incr(); + queuedCount.incrementAndGet(); executor.execute(() -> { try { handler.onMessage(message, publisher); - done.incr(); + successfulCount.incrementAndGet(); } catch (Exception ex) { LOG.error("Error on execution message {}", message, ex); - failed.incr(); + failedCount.incrementAndGet(); } }); } @Override public long failedEvents() { - return failed.value(); + return failedCount.get(); } @Override public long successfulEvents() { - return done.value(); + return successfulCount.get(); } @Override public long queuedEvents() { - return queued.value(); + return queuedCount.get(); } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java index 2bdf705cfee..39444097fed 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -25,8 +25,6 @@ import org.junit.Test; import java.util.Set; import java.util.stream.Collectors; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; - /** * Testing the basic functionality of the event queue. */ @@ -46,13 +44,11 @@ public class TestEventQueue { @Before public void startEventQueue() { - DefaultMetricsSystem.initialize(getClass().getSimpleName()); queue = new EventQueue(); } @After public void stopEventQueue() { - DefaultMetricsSystem.shutdown(); queue.close(); } @@ -83,4 +79,35 @@ public class TestEventQueue { } + @Test + public void handlerGroup() { + final long[] result = new long[2]; + queue.addHandlerGroup( + "group", + new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) -> + result[0] = payload), + new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) -> + result[1] = payload) + ); + + queue.fireEvent(EVENT3, 23L); + queue.fireEvent(EVENT4, 42L); + + queue.processAll(1000); + + Assert.assertEquals(23, result[0]); + Assert.assertEquals(42, result[1]); + + Set eventQueueThreadNames = + Thread.getAllStackTraces().keySet() + .stream() + .filter(t -> t.getName().startsWith(SingleThreadExecutor + .THREAD_NAME_PREFIX)) + .map(Thread::getName) + .collect(Collectors.toSet()); + System.out.println(eventQueueThreadNames); + Assert.assertEquals(1, eventQueueThreadNames.size()); + + } + } \ No newline at end of file