From 207c2265bf58f98469c3a49ad7c7df7f47cefa78 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Wed, 23 Sep 2020 23:37:11 +0200 Subject: [PATCH 1/2] ARTEMIS-2912 Server start exception before activation can cause a zombie broker --- .../activemq/artemis/cli/commands/Run.java | 6 +-- .../artemis/core/server/ActiveMQServer.java | 11 +++- .../core/server/impl/ActiveMQServerImpl.java | 50 +++++++++++++----- .../server/impl/ActiveMQServerImplTest.java | 51 +++++++++++++++++++ .../core/server/impl/EmbeddedServerTest.java | 8 +-- .../failover/ReplicatedFailoverTest.java | 2 +- 6 files changed, 105 insertions(+), 23 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java index a5b1487748..366f0c0f9d 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java @@ -88,7 +88,7 @@ public class Run extends LockAbstract { AtomicBoolean serverActivationFailed = new AtomicBoolean(false); server.getServer().registerActivationFailureListener(exception -> serverActivationFailed.set(true)); server.start(); - server.getServer().addExternalComponent(managementContext); + server.getServer().addExternalComponent(managementContext, false); if (broker.web != null) { broker.components.add(broker.web); @@ -98,8 +98,8 @@ public class Run extends LockAbstract { Class clazz = this.getClass().getClassLoader().loadClass(componentDTO.componentClassName); ExternalComponent component = (ExternalComponent) clazz.newInstance(); component.configure(componentDTO, getBrokerInstance(), getBrokerHome()); - server.getServer().addExternalComponent(component); - component.start(); + server.getServer().addExternalComponent(component, true); + assert component.isStarted(); } if (serverActivationFailed.get()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index c0ee6c5ff1..77ff502f55 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -833,7 +833,16 @@ public interface ActiveMQServer extends ServiceComponent { void setMBeanServer(MBeanServer mBeanServer); - void addExternalComponent(ActiveMQComponent externalComponent); + /** + * Adding external components is allowed only if the state + * isn't {@link SERVER_STATE#STOPPED} or {@link SERVER_STATE#STOPPING}.
+ * It atomically starts the {@code externalComponent} while being added if {@code start == true}.
+ * This atomicity is necessary to prevent {@link #stop()} to stop the component right after adding it, but before + * starting it. + * + * @throw IllegalStateException if the state is {@link SERVER_STATE#STOPPED} or {@link SERVER_STATE#STOPPING} + */ + void addExternalComponent(ActiveMQComponent externalComponent, boolean start) throws Exception; List getExternalComponents(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index c6145713b8..46ba027d5c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -837,9 +837,23 @@ public class ActiveMQServerImpl implements ActiveMQServer { this.mbeanServer = mbeanServer; } + private void validateAddExternalComponent(ActiveMQComponent externalComponent) { + final SERVER_STATE state = this.state; + if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) { + throw new IllegalStateException("cannot add " + externalComponent.getClass().getSimpleName() + + " if state is " + state); + } + } + @Override - public void addExternalComponent(ActiveMQComponent externalComponent) { - externalComponents.add(externalComponent); + public void addExternalComponent(ActiveMQComponent externalComponent, boolean start) throws Exception { + synchronized (externalComponents) { + validateAddExternalComponent(externalComponent); + externalComponents.add(externalComponent); + if (start) { + externalComponent.start(); + } + } } @Override @@ -1261,17 +1275,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { connectedClientIds.clear(); - for (ActiveMQComponent externalComponent : externalComponents) { - try { - if (externalComponent instanceof ServiceComponent) { - ((ServiceComponent)externalComponent).stop(isShutdown || criticalIOError); - } else { - externalComponent.stop(); - } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorStoppingComponent(e, externalComponent.getClass().getName()); - } - } + stopExternalComponents(isShutdown || criticalIOError); try { this.analyzer.clear(); @@ -4065,7 +4069,25 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public List getExternalComponents() { - return externalComponents; + synchronized (externalComponents) { + return new ArrayList<>(externalComponents); + } + } + + private void stopExternalComponents(boolean shutdown) { + synchronized (externalComponents) { + for (ActiveMQComponent externalComponent : externalComponents) { + try { + if (externalComponent instanceof ServiceComponent) { + ((ServiceComponent) externalComponent).stop(shutdown); + } else { + externalComponent.stop(); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorStoppingComponent(e, externalComponent.getClass().getName()); + } + } + } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java index e443478ab0..db3d07d1f3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java @@ -26,8 +26,59 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.collection.IsEmptyCollection.empty; + public class ActiveMQServerImplTest extends ActiveMQTestBase { + @Test + public void testAddingAndStartingExternalComponent() throws Exception { + ActiveMQServer server = createServer(false); + server.start(); + EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent(); + server.addExternalComponent(component, true); + Assert.assertTrue(component.isStarted()); + Assert.assertThat(server.getExternalComponents(), hasItem(component)); + } + + @Test + public void testAddingWithoutStartingExternalComponent() throws Exception { + ActiveMQServer server = createServer(false); + server.start(); + EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent(); + server.addExternalComponent(component, false); + Assert.assertFalse(component.isStarted()); + Assert.assertThat(server.getExternalComponents(), hasItem(component)); + } + + @Test + public void testCannotAddExternalComponentsIfNotStarting() throws Exception { + ActiveMQServer server = createServer(false); + EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent(); + try { + server.addExternalComponent(component, false); + Assert.fail(); + } catch (IllegalStateException ex) { + Assert.assertFalse(component.isStarted()); + Assert.assertThat(server.getExternalComponents(), empty()); + } + } + + @Test + public void testCannotAddExternalComponentsIfStopped() throws Exception { + ActiveMQServer server = createServer(false); + server.start(); + server.stop(); + EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent(); + try { + server.addExternalComponent(component, false); + Assert.fail(); + } catch (IllegalStateException ex) { + Assert.assertFalse(component.isStarted()); + Assert.assertThat(server.getExternalComponents(), empty()); + } + } + @Test public void testScheduledPoolGC() throws Exception { ActiveMQServer server = createServer(false); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/EmbeddedServerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/EmbeddedServerTest.java index 31b2626ce6..cf07385eed 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/EmbeddedServerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/EmbeddedServerTest.java @@ -78,8 +78,8 @@ public class EmbeddedServerTest { FakeExternalComponent normalComponent = new FakeExternalComponent(); FakeExternalServiceComponent serviceComponent = new FakeExternalServiceComponent(); - server.addExternalComponent(normalComponent); - server.addExternalComponent(serviceComponent); + server.addExternalComponent(normalComponent, false); + server.addExternalComponent(serviceComponent, false); server.stop(false); assertTrue(normalComponent.stopCalled); @@ -98,7 +98,7 @@ public class EmbeddedServerTest { assertTrue(serviceComponent.exitCalled); } - private class FakeExternalComponent implements ActiveMQComponent { + public static class FakeExternalComponent implements ActiveMQComponent { volatile boolean startCalled; volatile boolean stopCalled; @@ -124,7 +124,7 @@ public class EmbeddedServerTest { } } - private class FakeExternalServiceComponent extends FakeExternalComponent implements ServiceComponent { + public static class FakeExternalServiceComponent extends FakeExternalComponent implements ServiceComponent { volatile boolean exitCalled; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java index e246a8adc7..ab8b440843 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java @@ -176,7 +176,7 @@ public class ReplicatedFailoverTest extends FailoverTest { backupServer.getServer().getNetworkHealthCheck().parseURIList("http://localhost:8787"); Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted()); - backupServer.getServer().addExternalComponent(webServerComponent); + backupServer.getServer().addExternalComponent(webServerComponent, false); // this is called when backup servers go from live back to backup backupServer.getServer().fail(true); Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted()); From 69fa4f3e93675dbe6458de8f4c58cce56bea64c5 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 24 Sep 2020 01:36:01 +0200 Subject: [PATCH 2/2] ARTEMIS-2912 Handle NPE due to uninitialized members --- .../core/server/impl/ActiveMQServerImpl.java | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 46ba027d5c..1c574e16d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1089,17 +1089,24 @@ public class ActiveMQServerImpl implements ActiveMQServer { state = SERVER_STATE.STOPPING; if (criticalIOError) { - // notifications trigger disk IO so we don't want to send any on a critical IO error - managementService.enableNotifications(false); + final ManagementService managementService = this.managementService; + if (managementService != null) { + // notifications trigger disk IO so we don't want to send any on a critical IO error + managementService.enableNotifications(false); + } } + final FileStoreMonitor fileStoreMonitor = this.fileStoreMonitor; if (fileStoreMonitor != null) { fileStoreMonitor.stop(); - fileStoreMonitor = null; + this.fileStoreMonitor = null; } if (failoverOnServerShutdown) { - activation.sendLiveIsStopping(); + final Activation activation = this.activation; + if (activation != null) { + activation.sendLiveIsStopping(); + } } stopComponent(connectorsService); @@ -1113,6 +1120,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { stopComponent(federationManager); stopComponent(clusterManager); + final RemotingService remotingService = this.remotingService; if (remotingService != null) { remotingService.pauseAcceptors(); } @@ -1134,7 +1142,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { freezeConnections(); } - activation.postConnectionFreeze(); + final Activation activation = this.activation; + if (activation != null) { + activation.postConnectionFreeze(); + } closeAllServerSessions(criticalIOError); @@ -1147,6 +1158,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { // // ************************************************************************************************************* + final StorageManager storageManager = this.storageManager; if (storageManager != null) storageManager.clearContext(); @@ -1155,10 +1167,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { stopComponent(backupManager); - try { - activation.preStorageClose(); - } catch (Throwable t) { - ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, activation.getClass().getName()); + if (activation != null) { + try { + activation.preStorageClose(); + } catch (Throwable t) { + ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, activation.getClass().getName()); + } } stopComponent(pagingManager); @@ -1172,6 +1186,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { // We stop remotingService before otherwise we may lock the system in case of a critical IO // error shutdown + final RemotingService remotingService = this.remotingService; if (remotingService != null) try { remotingService.stop(criticalIOError); @@ -1180,6 +1195,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } // Stop the management service after the remoting service to ensure all acceptors are deregistered with JMX + final ManagementService managementService = this.managementService; if (managementService != null) try { managementService.unregisterServer(); @@ -1232,7 +1248,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { messagingServerControl = null; memoryManager = null; backupManager = null; - storageManager = null; + this.storageManager = null; sessions.clear(); @@ -1330,7 +1346,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { * {@link #stop(boolean, boolean, boolean)}. */ private void freezeConnections() { - activation.freezeConnections(remotingService); + Activation activation = this.activation; + if (activation != null) { + activation.freezeConnections(remotingService); + } // after disconnecting all the clients close all the server sessions so any messages in delivery will be cancelled back to the queue for (ServerSession serverSession : sessions.values()) {