resolve https://issues.apache.org/activemq/browse/AMQ-2527 - add timeout to waitForSlave and make the following more reseliant to slow machines, VMTransportWaitForTest,MasterSlaveSlaveDieTest,SimpleNetworkTest,NetworkBrokerDetachTest,DuplexNetworkMBeanTest,MultiBrokersMultiClientsTest,AMQ2102Test - related to changes for https://issues.apache.org/activemq/browse/AMQ-1112

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@888974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-12-09 21:09:21 +00:00
parent f1f4e2c9d9
commit e80bf00b5b
8 changed files with 56 additions and 22 deletions

View File

@ -116,6 +116,7 @@ public class BrokerService implements Service {
private boolean shutdownOnMasterFailure;
private boolean shutdownOnSlaveFailure;
private boolean waitForSlave;
private long waitForSlaveTimeout = 600000L;
private boolean passiveSlave;
private String brokerName = DEFAULT_BROKER_NAME;
private File dataDirectoryFile;
@ -1908,7 +1909,9 @@ public class BrokerService implements Service {
protected void waitForSlave() {
try {
slaveStartSignal.await();
if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
}
} catch (InterruptedException e) {
LOG.error("Exception waiting for slave:" + e);
}
@ -2105,7 +2108,15 @@ public class BrokerService implements Service {
public void setWaitForSlave(boolean waitForSlave) {
this.waitForSlave = waitForSlave;
}
public long getWaitForSlaveTimeout() {
return this.waitForSlaveTimeout;
}
public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
this.waitForSlaveTimeout = waitForSlaveTimeout;
}
public CountDownLatch getSlaveStartSignal() {
return slaveStartSignal;
}
@ -2132,4 +2143,4 @@ public class BrokerService implements Service {
}
}
}

View File

@ -700,7 +700,6 @@ public class RegionBroker extends EmptyBroker {
try{
if(node!=null){
Message message=node.getMessage();
stampAsExpired(message);
if(message!=null && node.getRegionDestination()!=null){
DeadLetterStrategy deadLetterStrategy=node
.getRegionDestination().getDeadLetterStrategy();
@ -708,6 +707,7 @@ public class RegionBroker extends EmptyBroker {
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
// message may be inflight to other subscriptions so do not modify
message = message.copy();
stampAsExpired(message);
message.setExpiration(0);
if(!message.isPersistent()){
message.setPersistent(true);

View File

@ -53,7 +53,10 @@ public class MasterSlaveSlaveDieTest extends TestCase {
final BrokerService master = new BrokerService();
master.setBrokerName("master");
master.setPersistent(false);
master.addConnector("tcp://localhost:0");
// The wireformat negotiation timeout (defaults to same as
// MaxInactivityDurationInitalDelay) needs to be a bit longer
// on slow running machines - set it to 90 seconds.
master.addConnector("tcp://localhost:0?wireFormat.maxInactivityDurationInitalDelay=90000");
master.setWaitForSlave(true);
master.setPlugins(new BrokerPlugin[] { new Plugin() });
@ -72,6 +75,7 @@ public class MasterSlaveSlaveDieTest extends TestCase {
try {
master.start();
} catch (Exception e) {
LOG.warn("Exception starting master: " + e);
e.printStackTrace();
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeNotNull;
import java.net.MalformedURLException;
import java.util.Set;
@ -26,13 +29,13 @@ import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DuplexNetworkMBeanTest extends TestCase {
import org.junit.Test;
public class DuplexNetworkMBeanTest {
protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class);
protected final int numRestarts = 3;
@ -54,6 +57,7 @@ public class DuplexNetworkMBeanTest extends TestCase {
return broker;
}
@Test
public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception {
BrokerService broker = createBroker();
broker.start();
@ -78,6 +82,7 @@ public class DuplexNetworkMBeanTest extends TestCase {
broker.waitUntilStopped();
}
@Test
public void testMbeanPresenceOnBrokerRestart() throws Exception {
BrokerService networkedBroker = createNetworkedBroker();
@ -129,6 +134,14 @@ public class DuplexNetworkMBeanTest extends TestCase {
}
}
} while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
// If port 1099 is in use when the Broker starts, starting the jmx
// connector will fail. So, if we have no mbsc to query, skip the
// test.
if (timeout > 0) {
assumeNotNull(mbeans);
}
return count;
}

View File

@ -16,11 +16,13 @@
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNotNull;
import java.net.MalformedURLException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
@ -29,8 +31,6 @@ import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
@ -38,8 +38,9 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
public class NetworkBrokerDetachTest extends TestCase {
public class NetworkBrokerDetachTest {
private final static String BROKER_NAME = "broker";
private final static String REM_BROKER_NAME = "networkedBroker";
@ -65,6 +66,7 @@ public class NetworkBrokerDetachTest extends TestCase {
return broker;
}
@Test
public void testNetworkedBrokerDetach() throws Exception {
BrokerService broker = createBroker();
broker.start();
@ -162,6 +164,10 @@ public class NetworkBrokerDetachTest extends TestCase {
} catch (Exception ignored) {
LOG.warn("getMBeanServer ex: " + ignored);
}
// If port 1099 is in use when the Broker starts, starting the jmx
// connector will fail. So, if we have no mbsc to query, skip the
// test.
assumeNotNull(mbsc);
return mbsc;
}

View File

@ -81,7 +81,7 @@ public class SimpleNetworkTest extends TestCase {
TopicRequestor requestor = new TopicRequestor((TopicSession)localSession, included);
// allow for consumer infos to perculate arround
Thread.sleep(2000);
Thread.sleep(5000);
for (int i = 0; i < MESSAGE_COUNT; i++) {
TextMessage msg = localSession.createTextMessage("test msg: " + i);
TextMessage result = (TextMessage)requestor.request(msg);
@ -110,16 +110,16 @@ public class SimpleNetworkTest extends TestCase {
MessageConsumer consumer2 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Thread.sleep(1000);
Thread.sleep(2000);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
assertNotNull(consumer1.receive(500));
assertNotNull(consumer2.receive(500));
assertNotNull(consumer1.receive(1000));
assertNotNull(consumer2.receive(1000));
}
// ensure no more messages received
assertNull(consumer1.receive(500));
assertNull(consumer2.receive(500));
assertNull(consumer1.receive(1000));
assertNull(consumer2.receive(1000));
}
public void testDurableStoreAndForward() throws Exception {

View File

@ -57,7 +57,7 @@ public class VMTransportWaitForTest extends TestCase {
} catch (Exception e) {
e.printStackTrace();
fail("unexpected exception:" + e);
fail("unexpected exception: " + e);
}
}
};
@ -70,7 +70,7 @@ public class VMTransportWaitForTest extends TestCase {
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616");
broker.start();
assertTrue("has got connection", gotConnection.await(200, TimeUnit.MILLISECONDS));
assertTrue("has got connection", gotConnection.await(400, TimeUnit.MILLISECONDS));
broker.stop();
}
}

View File

@ -66,7 +66,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
// wait for consumers to get propagated
for (int i = 1; i <= BROKER_COUNT; i++) {
// all consumers on the remote brokers look like 1 consumer to the local broker.
assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000);
assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 65000);
}
// Send messages
@ -117,7 +117,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
// wait for consumers to get propagated
for (int i = 1; i <= BROKER_COUNT; i++) {
// all consumers on the remote brokers look like 1 consumer to the local broker.
assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000);
assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 65000);
}
// Send messages