From 00592d6dd8305cbd2d6ee4fdfb0870495e46f9c9 Mon Sep 17 00:00:00 2001 From: Somdatta Date: Fri, 22 Oct 2021 03:07:25 +0530 Subject: [PATCH] ARTEMIS-3085 support custom IOCriticalErrorListener --- .../artemis/core/server/ActiveMQServer.java | 10 ++++- .../core/server/impl/ActiveMQServerImpl.java | 11 ++++- .../server/IOCriticalErrorListenerTest.java | 45 +++++++++++++++++++ 3 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/IOCriticalErrorListenerTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 57a02aa305..2f690afa8d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -68,8 +69,8 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugi import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; -import org.apache.activemq.artemis.core.server.routing.ConnectionRouterManager; import org.apache.activemq.artemis.core.server.reload.ReloadManager; +import org.apache.activemq.artemis.core.server.routing.ConnectionRouterManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; @@ -198,6 +199,13 @@ public interface ActiveMQServer extends ServiceComponent { */ void registerActivationFailureListener(ActivationFailureListener listener); + /** + * Register a listener to detect I/O Critical errors + * + * @param listener @see org.apache.activemq.artemis.core.io.IOCriticalErrorListener + */ + void registerIOCriticalErrorListener(IOCriticalErrorListener listener); + void replay(Date start, Date end, String address, String target, String filter) throws Exception; /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index d9a1f7bc21..8c366e3d0a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.lang.invoke.MethodHandles; import java.lang.management.ManagementFactory; import java.net.URL; import java.security.AccessController; @@ -172,10 +173,10 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; import org.apache.activemq.artemis.core.server.reload.ReloadCallback; -import org.apache.activemq.artemis.core.server.routing.ConnectionRouterManager; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl; import org.apache.activemq.artemis.core.server.replay.ReplayManager; +import org.apache.activemq.artemis.core.server.routing.ConnectionRouterManager; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; @@ -211,7 +212,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalComponent; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; @@ -343,6 +343,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { private final Set activationFailureListeners = new ConcurrentHashSet<>(); + private final Set ioCriticalErrorListeners = new ConcurrentHashSet<>(); + private final Set postQueueCreationCallbacks = new ConcurrentHashSet<>(); private final Set postQueueDeletionCallbacks = new ConcurrentHashSet<>(); @@ -2517,6 +2519,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + @Override + public void registerIOCriticalErrorListener(final IOCriticalErrorListener listener) { + ioCriticalErrorListeners.add(listener); + } + @Override public void registerPostQueueCreationCallback(final PostQueueCreationCallback callback) { postQueueCreationCallbacks.add(callback); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/IOCriticalErrorListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/IOCriticalErrorListenerTest.java new file mode 100644 index 0000000000..94f74e4339 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/IOCriticalErrorListenerTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Test; + +public class IOCriticalErrorListenerTest extends ActiveMQTestBase { + protected ActiveMQServer server; + + @Test + public void simpleTest() throws Exception { + ServerSocket s = new ServerSocket(); + try { + s.bind(new InetSocketAddress("127.0.0.1", 61616)); + server = createServer(false, createDefaultNettyConfig()); + final CountDownLatch latch = new CountDownLatch(1); + server.registerIOCriticalErrorListener((code, message, file) -> latch.countDown()); + server.start(); + assertTrue(latch.await(3000, TimeUnit.MILLISECONDS)); + } finally { + s.close(); + } + } +}