HDDS-224. Create metrics for Event Watcher.

Contributed b Elek, Marton.
This commit is contained in:
Anu Engineer 2018-07-09 12:52:39 -07:00
parent 4a08ddfa68
commit cb5e225868
3 changed files with 96 additions and 92 deletions

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.hdds.server.events; package org.apache.hadoop.hdds.server.events;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(EventQueue.class); LoggerFactory.getLogger(EventQueue.class);
private static final String EXECUTOR_NAME_SEPARATOR = "For";
private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors = private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
new HashMap<>(); new HashMap<>();
@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable {
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventHandler<PAYLOAD> handler) { EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
this.addHandler(event, handler, generateHandlerName(handler));
this.addHandler(event, new SingleThreadExecutor<>(
event.getName()), handler);
}
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event,
EventExecutor<PAYLOAD> executor,
EventHandler<PAYLOAD> handler) {
executors.putIfAbsent(event, new HashMap<>());
executors.get(event).putIfAbsent(executor, new ArrayList<>());
executors.get(event)
.get(executor)
.add(handler);
} }
/** /**
* Creates one executor with multiple event handlers. * Add new handler to the event queue.
* <p>
* By default a separated single thread executor will be dedicated to
* deliver the events to the registered event handler.
*
* @param event Triggering event.
* @param handler Handler of event (will be called from a separated
* thread)
* @param handlerName The name of handler (should be unique together with
* the event name)
* @param <PAYLOAD> The type of the event payload.
* @param <EVENT_TYPE> The type of the event identifier.
*/ */
public void addHandlerGroup(String name, HandlerForEvent<?>... public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
eventsAndHandlers) { EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
SingleThreadExecutor sharedExecutor = validateEvent(event);
new SingleThreadExecutor(name); Preconditions.checkNotNull(handler, "Handler name should not be null.");
for (HandlerForEvent handlerForEvent : eventsAndHandlers) { String executorName =
addHandler(handlerForEvent.event, sharedExecutor, StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
handlerForEvent.handler); + handlerName;
this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
} }
private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
Preconditions
.checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
"Event name should not contain " + EXECUTOR_NAME_SEPARATOR
+ " string.");
} }
private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
if (!"".equals(handler.getClass().getSimpleName())) {
return handler.getClass().getSimpleName();
} else {
return handler.getClass().getName();
}
}
/**
* Add event handler with custom executor.
*
* @param event Triggering event.
* @param executor The executor imlementation to deliver events from a
* separated threads. Please keep in your mind that
* registering metrics is the responsibility of the
* caller.
* @param handler Handler of event (will be called from a separated
* thread)
* @param <PAYLOAD> The type of the event payload.
* @param <EVENT_TYPE> The type of the event identifier.
*/
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
EventHandler<PAYLOAD> handler) {
validateEvent(event);
executors.putIfAbsent(event, new HashMap<>());
executors.get(event).putIfAbsent(executor, new ArrayList<>());
executors.get(event).get(executor).add(handler);
}
/** /**
* Route an event with payload to the right listener(s). * Route an event with payload to the right listener(s).
* *
@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable {
}); });
} }
/**
* Event identifier together with the handler.
*
* @param <PAYLOAD>
*/
public static class HandlerForEvent<PAYLOAD> {
private final Event<PAYLOAD> event;
private final EventHandler<PAYLOAD> handler;
public HandlerForEvent(
Event<PAYLOAD> event,
EventHandler<PAYLOAD> handler) {
this.event = event;
this.handler = handler;
}
public Event<PAYLOAD> getEvent() {
return event;
}
public EventHandler<PAYLOAD> getHandler() {
return handler;
}
}
} }

View File

@ -23,13 +23,18 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/** /**
* Simple EventExecutor to call all the event handler one-by-one. * Simple EventExecutor to call all the event handler one-by-one.
* *
* @param <T> * @param <T>
*/ */
@Metrics(context = "EventQueue")
public class SingleThreadExecutor<T> implements EventExecutor<T> { public class SingleThreadExecutor<T> implements EventExecutor<T> {
public static final String THREAD_NAME_PREFIX = "EventQueue"; public static final String THREAD_NAME_PREFIX = "EventQueue";
@ -41,14 +46,24 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
private final ThreadPoolExecutor executor; private final ThreadPoolExecutor executor;
private final AtomicLong queuedCount = new AtomicLong(0); @Metric
private MutableCounterLong queued;
private final AtomicLong successfulCount = new AtomicLong(0); @Metric
private MutableCounterLong done;
private final AtomicLong failedCount = new AtomicLong(0); @Metric
private MutableCounterLong failed;
/**
* Create SingleThreadExecutor.
*
* @param name Unique name used in monitoring and metrics.
*/
public SingleThreadExecutor(String name) { public SingleThreadExecutor(String name) {
this.name = name; this.name = name;
DefaultMetricsSystem.instance()
.register("EventQueue" + name, "Event Executor metrics ", this);
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
executor = executor =
@ -64,31 +79,31 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
@Override @Override
public void onMessage(EventHandler<T> handler, T message, EventPublisher public void onMessage(EventHandler<T> handler, T message, EventPublisher
publisher) { publisher) {
queuedCount.incrementAndGet(); queued.incr();
executor.execute(() -> { executor.execute(() -> {
try { try {
handler.onMessage(message, publisher); handler.onMessage(message, publisher);
successfulCount.incrementAndGet(); done.incr();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error on execution message {}", message, ex); LOG.error("Error on execution message {}", message, ex);
failedCount.incrementAndGet(); failed.incr();
} }
}); });
} }
@Override @Override
public long failedEvents() { public long failedEvents() {
return failedCount.get(); return failed.value();
} }
@Override @Override
public long successfulEvents() { public long successfulEvents() {
return successfulCount.get(); return done.value();
} }
@Override @Override
public long queuedEvents() { public long queuedEvents() {
return queuedCount.get(); return queued.value();
} }
@Override @Override

View File

@ -25,6 +25,8 @@ import org.junit.Test;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
/** /**
* Testing the basic functionality of the event queue. * Testing the basic functionality of the event queue.
*/ */
@ -44,11 +46,13 @@ public class TestEventQueue {
@Before @Before
public void startEventQueue() { public void startEventQueue() {
DefaultMetricsSystem.initialize(getClass().getSimpleName());
queue = new EventQueue(); queue = new EventQueue();
} }
@After @After
public void stopEventQueue() { public void stopEventQueue() {
DefaultMetricsSystem.shutdown();
queue.close(); queue.close();
} }
@ -79,35 +83,4 @@ public class TestEventQueue {
} }
@Test
public void handlerGroup() {
final long[] result = new long[2];
queue.addHandlerGroup(
"group",
new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
result[0] = payload),
new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
result[1] = payload)
);
queue.fireEvent(EVENT3, 23L);
queue.fireEvent(EVENT4, 42L);
queue.processAll(1000);
Assert.assertEquals(23, result[0]);
Assert.assertEquals(42, result[1]);
Set<String> eventQueueThreadNames =
Thread.getAllStackTraces().keySet()
.stream()
.filter(t -> t.getName().startsWith(SingleThreadExecutor
.THREAD_NAME_PREFIX))
.map(Thread::getName)
.collect(Collectors.toSet());
System.out.println(eventQueueThreadNames);
Assert.assertEquals(1, eventQueueThreadNames.size());
}
} }