make test more deterministic, ensure bridge starts first time b/c remote broker is started first, may need to use a queue if there are further inconsistency problems

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1378540 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-08-29 13:29:20 +00:00
parent 6175dc4639
commit 935265529f
1 changed files with 14 additions and 11 deletions

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -42,6 +43,7 @@ import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
@ -59,6 +61,7 @@ import org.springframework.core.io.Resource;
public class CompressionOverNetworkTest {
protected static final int RECEIVE_TIMEOUT_MILLS = 10000;
protected static final int MESSAGE_COUNT = 10;
private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class);
@ -69,7 +72,7 @@ public class CompressionOverNetworkTest {
protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;
protected ActiveMQTopic included;
protected ActiveMQDestination included;
@Test
public void testCompressedOverCompressedNetwork() throws Exception {
@ -90,7 +93,7 @@ public class CompressionOverNetworkTest {
Message test = localSession.createTextMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(1000);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
assertTrue(message.isCompressed());
@ -113,7 +116,7 @@ public class CompressionOverNetworkTest {
Message test = localSession.createTextMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(1000);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
assertTrue(message.isCompressed());
@ -139,7 +142,7 @@ public class CompressionOverNetworkTest {
BytesMessage test = localSession.createBytesMessage();
test.writeBytes(bytes);
producer.send(test);
Message msg = consumer1.receive(1000*6000);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQBytesMessage message = (ActiveMQBytesMessage) msg;
assertTrue(message.isCompressed());
@ -170,7 +173,7 @@ public class CompressionOverNetworkTest {
}
producer.send(test);
Message msg = consumer1.receive(1000);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQStreamMessage message = (ActiveMQStreamMessage) msg;
assertTrue(message.isCompressed());
@ -196,7 +199,7 @@ public class CompressionOverNetworkTest {
}
producer.send(test);
Message msg = consumer1.receive(1000);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQMapMessage message = (ActiveMQMapMessage) msg;
assertTrue(message.isCompressed());
@ -222,7 +225,7 @@ public class CompressionOverNetworkTest {
Message test = localSession.createObjectMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(1000);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQObjectMessage message = (ActiveMQObjectMessage) msg;
assertTrue(message.isCompressed());
@ -271,14 +274,14 @@ public class CompressionOverNetworkTest {
}
protected void doSetUp(boolean deleteAllMessages) throws Exception {
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);