diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java new file mode 100644 index 00000000000..810c8b3437a --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +/** + * Identifier of an async event. + * + * @param THe message payload type of this event. + */ +public interface Event { + + /** + * The type of the event payload. Payload contains all the required data + * to process the event. + * + */ + Class getPayloadType(); + + /** + * The human readable name of the event. + * + * Used for display in thread names + * and monitoring. + * + */ + String getName(); +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java new file mode 100644 index 00000000000..42578394162 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +/** + * Executors defined the way how an EventHandler should be called. + *

+ * Executors are used only by the EventQueue and they do the thread separation + * between the caller and the EventHandler. + *

+ * Executors should guarantee that only one thread is executing one + * EventHandler at the same time. + * + * @param the payload type of the event. + */ +public interface EventExecutor extends AutoCloseable { + + /** + * Process an event payload. + * + * @param handler the handler to process the payload + * @param eventPayload to be processed. + * @param publisher to send response/other message forward to the chain. + */ + void onMessage(EventHandler handler, + PAYLOAD eventPayload, + EventPublisher + publisher); + + /** + * Return the number of the failed events. + */ + long failedEvents(); + + + /** + * Return the number of the processed events. + */ + long successfulEvents(); + + /** + * Return the number of the not-yet processed events. + */ + long queuedEvents(); + + /** + * The human readable name for the event executor. + *

+ * Used in monitoring and logging. + * + */ + String getName(); +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java new file mode 100644 index 00000000000..f40fc9e4f15 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +/** + * Processor to react on an event. + * + * EventExecutors should guarantee that the implementations are called only + * from one thread. + * + * @param + */ +@FunctionalInterface +public interface EventHandler { + + void onMessage(PAYLOAD payload, EventPublisher publisher); + +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java new file mode 100644 index 00000000000..a47fb5721e2 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +/** + * Client interface to send a new event. + */ +public interface EventPublisher { + + > void + fireEvent(EVENT_TYPE event, PAYLOAD payload); + +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java new file mode 100644 index 00000000000..44d85f5ffc0 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Simple async event processing utility. + *

+ * Event queue handles a collection of event handlers and routes the incoming + * events to one (or more) event handler. + */ +public class EventQueue implements EventPublisher, AutoCloseable { + + private static final Logger LOG = + LoggerFactory.getLogger(EventQueue.class); + + private final Map>> executors = + new HashMap<>(); + + private final AtomicLong queuedCount = new AtomicLong(0); + + private final AtomicLong eventCount = new AtomicLong(0); + + public > void addHandler( + EVENT_TYPE event, EventHandler handler) { + + this.addHandler(event, new SingleThreadExecutor<>( + event.getName()), handler); + } + + public > void addHandler( + EVENT_TYPE event, + EventExecutor executor, + EventHandler handler) { + + executors.putIfAbsent(event, new HashMap<>()); + executors.get(event).putIfAbsent(executor, new ArrayList<>()); + + executors.get(event) + .get(executor) + .add(handler); + } + + /** + * Creates one executor with multiple event handlers. + */ + public void addHandlerGroup(String name, HandlerForEvent... + eventsAndHandlers) { + SingleThreadExecutor sharedExecutor = + new SingleThreadExecutor(name); + for (HandlerForEvent handlerForEvent : eventsAndHandlers) { + addHandler(handlerForEvent.event, sharedExecutor, + handlerForEvent.handler); + } + + } + + /** + * Route an event with payload to the right listener(s). + * + * @param event The event identifier + * @param payload The payload of the event. + * @throws IllegalArgumentException If there is no EventHandler for + * the specific event. + */ + public > void fireEvent( + EVENT_TYPE event, PAYLOAD payload) { + + Map> eventExecutorListMap = + this.executors.get(event); + + eventCount.incrementAndGet(); + if (eventExecutorListMap != null) { + + for (Map.Entry> executorAndHandlers : + eventExecutorListMap.entrySet()) { + + for (EventHandler handler : executorAndHandlers.getValue()) { + queuedCount.incrementAndGet(); + + executorAndHandlers.getKey() + .onMessage(handler, payload, this); + + } + } + + } else { + throw new IllegalArgumentException( + "No event handler registered for event " + event); + } + + } + + /** + * This is just for unit testing, don't use it for production code. + *

+ * It waits for all messages to be processed. If one event handler invokes an + * other one, the later one also should be finished. + *

+ * Long counter overflow is not handled, therefore it's safe only for unit + * testing. + *

+ * This method is just eventually consistent. In some cases it could return + * even if there are new messages in some of the handler. But in a simple + * case (one message) it will return only if the message is processed and + * all the dependent messages (messages which are sent by current handlers) + * are processed. + * + * @param timeout Timeout in seconds to wait for the processing. + */ + @VisibleForTesting + public void processAll(long timeout) { + long currentTime = Time.now(); + while (true) { + + long processed = 0; + + Stream allExecutor = this.executors.values().stream() + .flatMap(handlerMap -> handlerMap.keySet().stream()); + + boolean allIdle = + allExecutor.allMatch(executor -> executor.queuedEvents() == executor + .successfulEvents() + executor.failedEvents()); + + if (allIdle) { + return; + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if (Time.now() > currentTime + timeout) { + throw new AssertionError( + "Messages are not processed in the given timeframe. Queued: " + + queuedCount.get() + " Processed: " + processed); + } + } + } + + public void close() { + + Set allExecutors = this.executors.values().stream() + .flatMap(handlerMap -> handlerMap.keySet().stream()) + .collect(Collectors.toSet()); + + allExecutors.forEach(executor -> { + try { + executor.close(); + } catch (Exception ex) { + LOG.error("Can't close the executor " + executor.getName(), ex); + } + }); + } + + /** + * Event identifier together with the handler. + * + * @param + */ + public static class HandlerForEvent { + + private final Event event; + + private final EventHandler handler; + + public HandlerForEvent( + Event event, + EventHandler handler) { + this.event = event; + this.handler = handler; + } + + public Event getEvent() { + return event; + } + + public EventHandler getHandler() { + return handler; + } + } + +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java new file mode 100644 index 00000000000..a64e3d761dd --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simple EventExecutor to call all the event handler one-by-one. + * + * @param + */ +public class SingleThreadExecutor implements EventExecutor { + + public static final String THREAD_NAME_PREFIX = "EventQueue"; + + private static final Logger LOG = + LoggerFactory.getLogger(SingleThreadExecutor.class); + + private final String name; + + private final ThreadPoolExecutor executor; + + private final AtomicLong queuedCount = new AtomicLong(0); + + private final AtomicLong successfulCount = new AtomicLong(0); + + private final AtomicLong failedCount = new AtomicLong(0); + + public SingleThreadExecutor(String name) { + this.name = name; + + LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); + executor = + new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName(THREAD_NAME_PREFIX + "-" + name); + return thread; + }); + + } + + @Override + public void onMessage(EventHandler handler, T message, EventPublisher + publisher) { + queuedCount.incrementAndGet(); + executor.execute(() -> { + try { + handler.onMessage(message, publisher); + successfulCount.incrementAndGet(); + } catch (Exception ex) { + LOG.error("Error on execution message {}", message, ex); + failedCount.incrementAndGet(); + } + }); + } + + @Override + public long failedEvents() { + return failedCount.get(); + } + + @Override + public long successfulEvents() { + return successfulCount.get(); + } + + @Override + public long queuedEvents() { + return queuedCount.get(); + } + + @Override + public void close() { + executor.shutdown(); + } + + @Override + public String getName() { + return name; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java new file mode 100644 index 00000000000..c2159ad1557 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +/** + * Basic event implementation to implement custom events. + * + * @param + */ +public class TypedEvent implements Event { + + private final Class payloadType; + + private final String name; + + public TypedEvent(Class payloadType, String name) { + this.payloadType = payloadType; + this.name = name; + } + + public TypedEvent(Class payloadType) { + this.payloadType = payloadType; + this.name = payloadType.getSimpleName(); + } + + @Override + public Class getPayloadType() { + return payloadType; + } + + @Override + public String getName() { + return name; + } + +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java new file mode 100644 index 00000000000..89999ee6d8a --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.server.events; + +/** + * Simple event queue implementation for hdds/ozone components. + */ \ No newline at end of file diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java new file mode 100644 index 00000000000..39444097fed --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Testing the basic functionality of the event queue. + */ +public class TestEventQueue { + + private static final Event EVENT1 = + new TypedEvent<>(Long.class, "SCM_EVENT1"); + private static final Event EVENT2 = + new TypedEvent<>(Long.class, "SCM_EVENT2"); + + private static final Event EVENT3 = + new TypedEvent<>(Long.class, "SCM_EVENT3"); + private static final Event EVENT4 = + new TypedEvent<>(Long.class, "SCM_EVENT4"); + + private EventQueue queue; + + @Before + public void startEventQueue() { + queue = new EventQueue(); + } + + @After + public void stopEventQueue() { + queue.close(); + } + + @Test + public void simpleEvent() { + + final long[] result = new long[2]; + + queue.addHandler(EVENT1, (payload, publisher) -> result[0] = payload); + + queue.fireEvent(EVENT1, 11L); + queue.processAll(1000); + Assert.assertEquals(11, result[0]); + + } + + @Test + public void multipleSubscriber() { + final long[] result = new long[2]; + queue.addHandler(EVENT2, (payload, publisher) -> result[0] = payload); + + queue.addHandler(EVENT2, (payload, publisher) -> result[1] = payload); + + queue.fireEvent(EVENT2, 23L); + queue.processAll(1000); + Assert.assertEquals(23, result[0]); + Assert.assertEquals(23, result[1]); + + } + + @Test + public void handlerGroup() { + final long[] result = new long[2]; + queue.addHandlerGroup( + "group", + new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) -> + result[0] = payload), + new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) -> + result[1] = payload) + ); + + queue.fireEvent(EVENT3, 23L); + queue.fireEvent(EVENT4, 42L); + + queue.processAll(1000); + + Assert.assertEquals(23, result[0]); + Assert.assertEquals(42, result[1]); + + Set eventQueueThreadNames = + Thread.getAllStackTraces().keySet() + .stream() + .filter(t -> t.getName().startsWith(SingleThreadExecutor + .THREAD_NAME_PREFIX)) + .map(Thread::getName) + .collect(Collectors.toSet()); + System.out.println(eventQueueThreadNames); + Assert.assertEquals(1, eventQueueThreadNames.size()); + + } + +} \ No newline at end of file diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java new file mode 100644 index 00000000000..bb05ef453e6 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +import org.junit.Test; + +/** + * More realistic event test with sending event from one listener. + */ +public class TestEventQueueChain { + + private static final Event DECOMMISSION = + new TypedEvent<>(FailedNode.class); + + private static final Event DECOMMISSION_START = + new TypedEvent<>(FailedNode.class); + + @Test + public void simpleEvent() { + EventQueue queue = new EventQueue(); + + queue.addHandler(DECOMMISSION, new PipelineManager()); + queue.addHandler(DECOMMISSION_START, new NodeWatcher()); + + queue.fireEvent(DECOMMISSION, new FailedNode("node1")); + + queue.processAll(5000); + } + + + static class FailedNode { + private final String nodeId; + + FailedNode(String nodeId) { + this.nodeId = nodeId; + } + + String getNodeId() { + return nodeId; + } + } + + private static class PipelineManager implements EventHandler { + + @Override + public void onMessage(FailedNode message, EventPublisher publisher) { + + System.out.println( + "Closing pipelines for all pipelines including node: " + message + .getNodeId()); + + publisher.fireEvent(DECOMMISSION_START, message); + } + + } + + private static class NodeWatcher implements EventHandler { + + @Override + public void onMessage(FailedNode message, EventPublisher publisher) { + System.out.println("Clear timer"); + } + } +} \ No newline at end of file