diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java index 0fdfc31cfd..27b2a4604b 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/BackupReplicationStartFailedMessage.java @@ -111,4 +111,10 @@ public final class BackupReplicationStartFailedMessage extends PacketImpl result = 31 * result + (problem != null ? problem.hashCode() : 0); return result; } + + @Override + public String toString() + { + return getParentString() + ", problem=" + problem.name() + "]"; + } } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java index 0eaf6e2976..81d29bded8 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java @@ -58,6 +58,8 @@ public interface RemotingService void start() throws Exception; + void startAcceptors() throws Exception; + boolean isStarted(); /** diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java index 6baf3de67e..bdbebdf1c9 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java @@ -318,10 +318,10 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } - for (Acceptor a : acceptors.values()) - { - a.start(); - } + /** + * Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid + * race conditions. See {@link #startAcceptors()}. + */ // This thread checks connections that need to be closed, and also flushes confirmations failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL); @@ -331,6 +331,17 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle started = true; } + public synchronized void startAcceptors() throws Exception + { + if (isStarted()) + { + for (Acceptor a : acceptors.values()) + { + a.start(); + } + } + } + public synchronized void allowInvmSecurityOverride(ActiveMQPrincipal principal) { defaultInvmSecurityPrincipal = principal; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java index 469ece2742..4d3f847c75 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -431,7 +431,6 @@ public class ActiveMQServerImpl implements ActiveMQServer } else { - state = SERVER_STATE.STARTED; ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), nodeManager.getNodeId(), identity != null ? identity : ""); } @@ -1789,16 +1788,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw ActiveMQMessageBundle.BUNDLE.nodeIdNull(); } - activationLatch.countDown(); // We can only do this after everything is started otherwise we may get nasty races with expired messages postOffice.startExpiryScanner(); } - else - { - activationLatch.countDown(); - } + } + public void completeActivation() throws Exception + { + setState(ActiveMQServerImpl.SERVER_STATE.STARTED); + getRemotingService().startAcceptors(); + activationLatch.countDown(); callActivationCompleteCallbacks(); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LiveOnlyActivation.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LiveOnlyActivation.java index 2265b47fed..9540a09a4e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LiveOnlyActivation.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/LiveOnlyActivation.java @@ -62,6 +62,8 @@ public class LiveOnlyActivation extends Activation activeMQServer.initialisePart2(false); + activeMQServer.completeActivation(); + if (activeMQServer.getIdentity() != null) { ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity()); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingBackupActivation.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingBackupActivation.java index 1f77e8900b..a8e9aef78a 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingBackupActivation.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingBackupActivation.java @@ -289,6 +289,8 @@ public final class SharedNothingBackupActivation extends Activation } } + + activeMQServer.completeActivation(); } } catch (Exception e) diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingLiveActivation.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingLiveActivation.java index 68f6396eb4..abfffd6b2e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingLiveActivation.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedNothingLiveActivation.java @@ -103,6 +103,8 @@ public class SharedNothingLiveActivation extends LiveActivation activeMQServer.initialisePart2(false); + activeMQServer.completeActivation(); + if (activeMQServer.getIdentity() != null) { ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity()); @@ -141,6 +143,7 @@ public class SharedNothingLiveActivation extends LiveActivation } catch (ActiveMQException e) { + ActiveMQServerLogger.LOGGER.debug("Failed to process backup registration packet", e); channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION)); } } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreBackupActivation.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreBackupActivation.java index 726cd06d5e..459e725801 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreBackupActivation.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreBackupActivation.java @@ -87,6 +87,8 @@ public final class SharedStoreBackupActivation extends Activation activeMQServer.initialisePart2(scalingDown); + activeMQServer.completeActivation(); + if (scalingDown) { ActiveMQServerLogger.LOGGER.backupServerScaledDown(); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreLiveActivation.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreLiveActivation.java index e2b27ad8ec..f1ffc7e2b1 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreLiveActivation.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/SharedStoreLiveActivation.java @@ -76,6 +76,8 @@ public final class SharedStoreLiveActivation extends LiveActivation activeMQServer.initialisePart2(false); + activeMQServer.completeActivation(); + ActiveMQServerLogger.LOGGER.serverIsLive(); } catch (Exception e) diff --git a/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java b/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java index f6ffbc70cf..4e42b848c0 100644 --- a/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java +++ b/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java @@ -90,7 +90,7 @@ public abstract class ServiceTestBase extends UnitTestCase * debugging. */ private static final String SEND_CALL_NUMBER = "sendCallNumber"; - protected static final long WAIT_TIMEOUT = 10000; + protected static final long WAIT_TIMEOUT = 20000; private int sendMsgCount = 0; @Override diff --git a/tests/byteman-tests/src/test/java/org/apache/activemq/byteman/tests/ReplicationBackupTest.java b/tests/byteman-tests/src/test/java/org/apache/activemq/byteman/tests/ReplicationBackupTest.java new file mode 100644 index 0000000000..2a153743a5 --- /dev/null +++ b/tests/byteman-tests/src/test/java/org/apache/activemq/byteman/tests/ReplicationBackupTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.byteman.tests; + +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.tests.util.ReplicatedBackupUtils; +import org.apache.activemq.tests.util.ServiceTestBase; +import org.apache.activemq.tests.util.TransportConfigurationUtils; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class ReplicationBackupTest extends ServiceTestBase +{ + private static final CountDownLatch ruleFired = new CountDownLatch(1); + private ActiveMQServer backupServer; + private ActiveMQServer liveServer; + + /* + * simple test to induce a potential race condition where the server's acceptors are active, but the server's + * state != STARTED + */ + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "prevent backup annoucement", + targetClass = "org.apache.activemq.core.server.impl.SharedNothingLiveActivation", + targetMethod = "run", + targetLocation = "AT EXIT", + action = "org.apache.activemq.byteman.tests.ReplicationBackupTest.breakIt();" + ) + } + ) + public void testReplicatedBackupAnnouncement() throws Exception + { + TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); + TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0); + TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0); + TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0); + + final String suffix = "_backup"; + + Configuration backupConfig = createDefaultConfig() + .setBindingsDirectory(ActiveMQDefaultConfiguration.getDefaultBindingsDirectory() + suffix) + .setJournalDirectory(ActiveMQDefaultConfiguration.getDefaultJournalDir() + suffix) + .setPagingDirectory(ActiveMQDefaultConfiguration.getDefaultPagingDir() + suffix) + .setLargeMessagesDirectory(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir() + suffix); + + Configuration liveConfig = createDefaultConfig(); + + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + + liveServer = createServer(liveConfig); + + // start the live server in a new thread so we can start the backup simultaneously to induce a potential race + Thread startThread = new Thread(new Runnable() + { + @Override + public void run() + { + try + { + liveServer.start(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }); + startThread.start(); + + ruleFired.await(); + + backupServer = createServer(backupConfig); + backupServer.start(); + ServiceTestBase.waitForRemoteBackup(null, 3, true, backupServer); + } + + public static void breakIt() + { + ruleFired.countDown(); + try + { + /* before the fix this sleep would put the "live" server into a state where the acceptors were started + * but the server's state != STARTED which would cause the backup to fail to announce + */ + Thread.sleep(2000); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/topology/TopologyClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/topology/TopologyClusterTestBase.java index b18be9487a..69810f707d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/topology/TopologyClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/topology/TopologyClusterTestBase.java @@ -413,7 +413,7 @@ public abstract class TopologyClusterTestBase extends ClusterTestBase } Assert.assertFalse("cluster manager should stop", servers[4].getClusterManager().isStarted()); final String address = "foo1235"; - ServerLocator locator = createHAServerLocator(); + ServerLocator locator = createNonHALocator(isNetty()); ClientSessionFactory sf = createSessionFactory(locator); ClientSession session = sf.createSession(config.getClusterUser(), CLUSTER_PASSWORD, false, true, true, false, 1); session.createQueue(address, address, true); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java index f926e9d47e..50ab085407 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java @@ -161,7 +161,7 @@ public class LargeMessageOverBridgeTest extends JMSClusteredTestBase for (int i = 0; i < 5; i++) { - BytesMessage msg2 = (BytesMessage) cons2.receive(5000); + BytesMessage msg2 = (BytesMessage) cons2.receive(10000); assertNotNull(msg2); msg2.acknowledge();