diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index 44d85f5ffc0..7e29223461f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hdds.server.events; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventQueue.class); + private static final String EXECUTOR_NAME_SEPARATOR = "For"; + private final Map>> executors = new HashMap<>(); @@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable { public > void addHandler( EVENT_TYPE event, EventHandler handler) { - - this.addHandler(event, new SingleThreadExecutor<>( - event.getName()), handler); - } - - public > void addHandler( - EVENT_TYPE event, - EventExecutor executor, - EventHandler handler) { - - executors.putIfAbsent(event, new HashMap<>()); - executors.get(event).putIfAbsent(executor, new ArrayList<>()); - - executors.get(event) - .get(executor) - .add(handler); + this.addHandler(event, handler, generateHandlerName(handler)); } /** - * Creates one executor with multiple event handlers. + * Add new handler to the event queue. + *

+ * By default a separated single thread executor will be dedicated to + * deliver the events to the registered event handler. + * + * @param event Triggering event. + * @param handler Handler of event (will be called from a separated + * thread) + * @param handlerName The name of handler (should be unique together with + * the event name) + * @param The type of the event payload. + * @param The type of the event identifier. */ - public void addHandlerGroup(String name, HandlerForEvent... - eventsAndHandlers) { - SingleThreadExecutor sharedExecutor = - new SingleThreadExecutor(name); - for (HandlerForEvent handlerForEvent : eventsAndHandlers) { - addHandler(handlerForEvent.event, sharedExecutor, - handlerForEvent.handler); - } + public > void addHandler( + EVENT_TYPE event, EventHandler handler, String handlerName) { + validateEvent(event); + Preconditions.checkNotNull(handler, "Handler name should not be null."); + String executorName = + StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR + + handlerName; + this.addHandler(event, new SingleThreadExecutor<>(executorName), handler); + } + + private > void validateEvent(EVENT_TYPE event) { + Preconditions + .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR), + "Event name should not contain " + EXECUTOR_NAME_SEPARATOR + + " string."); } + private String generateHandlerName(EventHandler handler) { + if (!"".equals(handler.getClass().getSimpleName())) { + return handler.getClass().getSimpleName(); + } else { + return handler.getClass().getName(); + } + } + + /** + * Add event handler with custom executor. + * + * @param event Triggering event. + * @param executor The executor imlementation to deliver events from a + * separated threads. Please keep in your mind that + * registering metrics is the responsibility of the + * caller. + * @param handler Handler of event (will be called from a separated + * thread) + * @param The type of the event payload. + * @param The type of the event identifier. + */ + public > void addHandler( + EVENT_TYPE event, EventExecutor executor, + EventHandler handler) { + validateEvent(event); + executors.putIfAbsent(event, new HashMap<>()); + executors.get(event).putIfAbsent(executor, new ArrayList<>()); + + executors.get(event).get(executor).add(handler); + } + + + /** * Route an event with payload to the right listener(s). * @@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable { }); } - /** - * Event identifier together with the handler. - * - * @param - */ - public static class HandlerForEvent { - - private final Event event; - - private final EventHandler handler; - - public HandlerForEvent( - Event event, - EventHandler handler) { - this.event = event; - this.handler = handler; - } - - public Event getEvent() { - return event; - } - - public EventHandler getHandler() { - return handler; - } - } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java index a64e3d761dd..3253f2d5db2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -23,13 +23,18 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; /** * Simple EventExecutor to call all the event handler one-by-one. * * @param */ +@Metrics(context = "EventQueue") public class SingleThreadExecutor implements EventExecutor { public static final String THREAD_NAME_PREFIX = "EventQueue"; @@ -41,14 +46,24 @@ public class SingleThreadExecutor implements EventExecutor { private final ThreadPoolExecutor executor; - private final AtomicLong queuedCount = new AtomicLong(0); + @Metric + private MutableCounterLong queued; - private final AtomicLong successfulCount = new AtomicLong(0); + @Metric + private MutableCounterLong done; - private final AtomicLong failedCount = new AtomicLong(0); + @Metric + private MutableCounterLong failed; + /** + * Create SingleThreadExecutor. + * + * @param name Unique name used in monitoring and metrics. + */ public SingleThreadExecutor(String name) { this.name = name; + DefaultMetricsSystem.instance() + .register("EventQueue" + name, "Event Executor metrics ", this); LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); executor = @@ -64,31 +79,31 @@ public class SingleThreadExecutor implements EventExecutor { @Override public void onMessage(EventHandler handler, T message, EventPublisher publisher) { - queuedCount.incrementAndGet(); + queued.incr(); executor.execute(() -> { try { handler.onMessage(message, publisher); - successfulCount.incrementAndGet(); + done.incr(); } catch (Exception ex) { LOG.error("Error on execution message {}", message, ex); - failedCount.incrementAndGet(); + failed.incr(); } }); } @Override public long failedEvents() { - return failedCount.get(); + return failed.value(); } @Override public long successfulEvents() { - return successfulCount.get(); + return done.value(); } @Override public long queuedEvents() { - return queuedCount.get(); + return queued.value(); } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java index 39444097fed..2bdf705cfee 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -25,6 +25,8 @@ import org.junit.Test; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + /** * Testing the basic functionality of the event queue. */ @@ -44,11 +46,13 @@ public class TestEventQueue { @Before public void startEventQueue() { + DefaultMetricsSystem.initialize(getClass().getSimpleName()); queue = new EventQueue(); } @After public void stopEventQueue() { + DefaultMetricsSystem.shutdown(); queue.close(); } @@ -79,35 +83,4 @@ public class TestEventQueue { } - @Test - public void handlerGroup() { - final long[] result = new long[2]; - queue.addHandlerGroup( - "group", - new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) -> - result[0] = payload), - new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) -> - result[1] = payload) - ); - - queue.fireEvent(EVENT3, 23L); - queue.fireEvent(EVENT4, 42L); - - queue.processAll(1000); - - Assert.assertEquals(23, result[0]); - Assert.assertEquals(42, result[1]); - - Set eventQueueThreadNames = - Thread.getAllStackTraces().keySet() - .stream() - .filter(t -> t.getName().startsWith(SingleThreadExecutor - .THREAD_NAME_PREFIX)) - .map(Thread::getName) - .collect(Collectors.toSet()); - System.out.println(eventQueueThreadNames); - Assert.assertEquals(1, eventQueueThreadNames.size()); - - } - } \ No newline at end of file