diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 68aeff24a3..33112d3e2d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -116,6 +116,7 @@ public class BrokerService implements Service { private boolean shutdownOnMasterFailure; private boolean shutdownOnSlaveFailure; private boolean waitForSlave; + private long waitForSlaveTimeout = 600000L; private boolean passiveSlave; private String brokerName = DEFAULT_BROKER_NAME; private File dataDirectoryFile; @@ -1908,7 +1909,9 @@ public class BrokerService implements Service { protected void waitForSlave() { try { - slaveStartSignal.await(); + if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds."); + } } catch (InterruptedException e) { LOG.error("Exception waiting for slave:" + e); } @@ -2105,7 +2108,15 @@ public class BrokerService implements Service { public void setWaitForSlave(boolean waitForSlave) { this.waitForSlave = waitForSlave; } - + + public long getWaitForSlaveTimeout() { + return this.waitForSlaveTimeout; + } + + public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { + this.waitForSlaveTimeout = waitForSlaveTimeout; + } + public CountDownLatch getSlaveStartSignal() { return slaveStartSignal; } @@ -2132,4 +2143,4 @@ public class BrokerService implements Service { } -} \ No newline at end of file +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 1eeabffef1..b376528c82 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -700,7 +700,6 @@ public class RegionBroker extends EmptyBroker { try{ if(node!=null){ Message message=node.getMessage(); - stampAsExpired(message); if(message!=null && node.getRegionDestination()!=null){ DeadLetterStrategy deadLetterStrategy=node .getRegionDestination().getDeadLetterStrategy(); @@ -708,6 +707,7 @@ public class RegionBroker extends EmptyBroker { if(deadLetterStrategy.isSendToDeadLetterQueue(message)){ // message may be inflight to other subscriptions so do not modify message = message.copy(); + stampAsExpired(message); message.setExpiration(0); if(!message.isPersistent()){ message.setPersistent(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java index f7ee18820e..246d901119 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java @@ -53,7 +53,10 @@ public class MasterSlaveSlaveDieTest extends TestCase { final BrokerService master = new BrokerService(); master.setBrokerName("master"); master.setPersistent(false); - master.addConnector("tcp://localhost:0"); + // The wireformat negotiation timeout (defaults to same as + // MaxInactivityDurationInitalDelay) needs to be a bit longer + // on slow running machines - set it to 90 seconds. + master.addConnector("tcp://localhost:0?wireFormat.maxInactivityDurationInitalDelay=90000"); master.setWaitForSlave(true); master.setPlugins(new BrokerPlugin[] { new Plugin() }); @@ -72,6 +75,7 @@ public class MasterSlaveSlaveDieTest extends TestCase { try { master.start(); } catch (Exception e) { + LOG.warn("Exception starting master: " + e); e.printStackTrace(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java index 08b6b9a2fb..1777474b2b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.network; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeNotNull; + import java.net.MalformedURLException; import java.util.Set; @@ -26,13 +29,13 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; -import junit.framework.TestCase; - import org.apache.activemq.broker.BrokerService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -public class DuplexNetworkMBeanTest extends TestCase { +import org.junit.Test; + +public class DuplexNetworkMBeanTest { protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class); protected final int numRestarts = 3; @@ -54,6 +57,7 @@ public class DuplexNetworkMBeanTest extends TestCase { return broker; } + @Test public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception { BrokerService broker = createBroker(); broker.start(); @@ -78,6 +82,7 @@ public class DuplexNetworkMBeanTest extends TestCase { broker.waitUntilStopped(); } + @Test public void testMbeanPresenceOnBrokerRestart() throws Exception { BrokerService networkedBroker = createNetworkedBroker(); @@ -129,6 +134,14 @@ public class DuplexNetworkMBeanTest extends TestCase { } } } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis()); + + // If port 1099 is in use when the Broker starts, starting the jmx + // connector will fail. So, if we have no mbsc to query, skip the + // test. + if (timeout > 0) { + assumeNotNull(mbeans); + } + return count; } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java index 1ac5f8fb6b..a15b8fdd05 100644 --- a/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java @@ -16,11 +16,13 @@ */ package org.apache.activemq.network; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNotNull; + import java.net.MalformedURLException; import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.MessageConsumer; import javax.jms.Session; import javax.management.InstanceNotFoundException; import javax.management.MBeanServerConnection; @@ -29,8 +31,6 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; @@ -38,8 +38,9 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.junit.Test; -public class NetworkBrokerDetachTest extends TestCase { +public class NetworkBrokerDetachTest { private final static String BROKER_NAME = "broker"; private final static String REM_BROKER_NAME = "networkedBroker"; @@ -65,6 +66,7 @@ public class NetworkBrokerDetachTest extends TestCase { return broker; } + @Test public void testNetworkedBrokerDetach() throws Exception { BrokerService broker = createBroker(); broker.start(); @@ -162,6 +164,10 @@ public class NetworkBrokerDetachTest extends TestCase { } catch (Exception ignored) { LOG.warn("getMBeanServer ex: " + ignored); } + // If port 1099 is in use when the Broker starts, starting the jmx + // connector will fail. So, if we have no mbsc to query, skip the + // test. + assumeNotNull(mbsc); return mbsc; } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 2d9882758e..14751ed3a3 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -81,7 +81,7 @@ public class SimpleNetworkTest extends TestCase { TopicRequestor requestor = new TopicRequestor((TopicSession)localSession, included); // allow for consumer infos to perculate arround - Thread.sleep(2000); + Thread.sleep(5000); for (int i = 0; i < MESSAGE_COUNT; i++) { TextMessage msg = localSession.createTextMessage("test msg: " + i); TextMessage result = (TextMessage)requestor.request(msg); @@ -110,16 +110,16 @@ public class SimpleNetworkTest extends TestCase { MessageConsumer consumer2 = remoteSession.createConsumer(included); MessageProducer producer = localSession.createProducer(included); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - Thread.sleep(1000); + Thread.sleep(2000); for (int i = 0; i < MESSAGE_COUNT; i++) { Message test = localSession.createTextMessage("test-" + i); producer.send(test); - assertNotNull(consumer1.receive(500)); - assertNotNull(consumer2.receive(500)); + assertNotNull(consumer1.receive(1000)); + assertNotNull(consumer2.receive(1000)); } // ensure no more messages received - assertNull(consumer1.receive(500)); - assertNull(consumer2.receive(500)); + assertNull(consumer1.receive(1000)); + assertNull(consumer2.receive(1000)); } public void testDurableStoreAndForward() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java index 6472bc5906..505e65c99c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java @@ -57,7 +57,7 @@ public class VMTransportWaitForTest extends TestCase { } catch (Exception e) { e.printStackTrace(); - fail("unexpected exception:" + e); + fail("unexpected exception: " + e); } } }; @@ -70,7 +70,7 @@ public class VMTransportWaitForTest extends TestCase { broker.setPersistent(false); broker.addConnector("tcp://localhost:61616"); broker.start(); - assertTrue("has got connection", gotConnection.await(200, TimeUnit.MILLISECONDS)); + assertTrue("has got connection", gotConnection.await(400, TimeUnit.MILLISECONDS)); broker.stop(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java index 44dff0bbc6..74b7b3177a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java @@ -66,7 +66,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport // wait for consumers to get propagated for (int i = 1; i <= BROKER_COUNT; i++) { // all consumers on the remote brokers look like 1 consumer to the local broker. - assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000); + assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 65000); } // Send messages @@ -117,7 +117,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport // wait for consumers to get propagated for (int i = 1; i <= BROKER_COUNT; i++) { // all consumers on the remote brokers look like 1 consumer to the local broker. - assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000); + assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 65000); } // Send messages