mirror of https://github.com/apache/activemq.git
Fix test failure due to fixed ports used in the test.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1442619 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
88d85ae552
commit
5d9a6b67eb
|
@ -54,42 +54,46 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
public class FailoverManagedClusterTest extends TestCase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FailoverManagedClusterTest.class);
|
||||
|
||||
|
||||
long txGenerator = System.currentTimeMillis();
|
||||
|
||||
|
||||
private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
|
||||
private static final String SLAVE_BIND_ADDRESS = "tcp://0.0.0.0:61617";
|
||||
|
||||
private static final String BROKER_URL = "failover://(" + MASTER_BIND_ADDRESS + "," + SLAVE_BIND_ADDRESS + ")?randomize=false";
|
||||
|
||||
private String masterConnectionUri;
|
||||
private String slaveConnectionUri;
|
||||
|
||||
private String brokerUri;
|
||||
|
||||
private BrokerService master;
|
||||
private BrokerService slave;
|
||||
private CountDownLatch slaveThreadStarted = new CountDownLatch(1);
|
||||
private final CountDownLatch slaveThreadStarted = new CountDownLatch(1);
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
createAndStartMaster();
|
||||
createAndStartSlave();
|
||||
createAndStartSlave();
|
||||
|
||||
brokerUri = "failover://(" + masterConnectionUri + "," + slaveConnectionUri + ")?randomize=false";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (slave != null) {
|
||||
slave.stop();
|
||||
}
|
||||
|
||||
if (master != null) {
|
||||
master.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void createAndStartMaster() throws Exception {
|
||||
master = new BrokerService();
|
||||
master.setDeleteAllMessagesOnStartup(true);
|
||||
master.setUseJmx(false);
|
||||
master.setBrokerName("BROKER");
|
||||
master.addConnector(MASTER_BIND_ADDRESS);
|
||||
masterConnectionUri = master.addConnector(MASTER_BIND_ADDRESS).getPublishableConnectString();
|
||||
master.start();
|
||||
master.waitUntilStarted();
|
||||
}
|
||||
|
@ -98,10 +102,11 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
slave = new BrokerService();
|
||||
slave.setUseJmx(false);
|
||||
slave.setBrokerName("BROKER");
|
||||
slave.addConnector(SLAVE_BIND_ADDRESS);
|
||||
slaveConnectionUri = slave.addConnector(SLAVE_BIND_ADDRESS).getPublishableConnectString();
|
||||
|
||||
// Start the slave asynchronously, since this will block
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
slaveThreadStarted.countDown();
|
||||
|
@ -116,18 +121,19 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
}
|
||||
|
||||
public void testFailover() throws Exception {
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
|
||||
adapter.setServerUrl(BROKER_URL);
|
||||
adapter.setServerUrl(brokerUri);
|
||||
adapter.start(new StubBootstrapContext());
|
||||
|
||||
final CountDownLatch messageDelivered = new CountDownLatch(1);
|
||||
|
||||
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
LOG.info("Received message " + message);
|
||||
super.onMessage(message);
|
||||
|
@ -142,11 +148,13 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
activationSpec.validate();
|
||||
|
||||
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
|
||||
@Override
|
||||
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
|
||||
endpoint.xaresource = resource;
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
|
||||
return true;
|
||||
}
|
||||
|
@ -163,7 +171,7 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
|
||||
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
|
||||
slaveThreadStarted.await(10, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
// force a failover before send
|
||||
LOG.info("Stopping master to force failover..");
|
||||
master.stop();
|
||||
|
@ -173,45 +181,54 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
producer.send(session.createTextMessage("Hello, again!"));
|
||||
|
||||
// Wait for the message to be delivered.
|
||||
assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
|
||||
assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
|
||||
private static final class StubBootstrapContext implements BootstrapContext {
|
||||
@Override
|
||||
public WorkManager getWorkManager() {
|
||||
return new WorkManager() {
|
||||
@Override
|
||||
public void doWork(Work work) throws WorkException {
|
||||
new Thread(work).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
|
||||
new Thread(work).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long startWork(Work work) throws WorkException {
|
||||
new Thread(work).start();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
|
||||
new Thread(work).start();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scheduleWork(Work work) throws WorkException {
|
||||
new Thread(work).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
|
||||
new Thread(work).start();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public XATerminator getXATerminator() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Timer createTimer() throws UnavailableException {
|
||||
return null;
|
||||
}
|
||||
|
@ -222,6 +239,7 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
public XAResource xaresource;
|
||||
public Xid xid;
|
||||
|
||||
@Override
|
||||
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
|
||||
try {
|
||||
if (xid == null) {
|
||||
|
@ -233,6 +251,7 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDelivery() throws ResourceException {
|
||||
try {
|
||||
xaresource.end(xid, XAResource.TMSUCCESS);
|
||||
|
@ -244,15 +263,17 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void release() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
messageCount++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public Xid createXid() throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream os = new DataOutputStream(baos);
|
||||
|
@ -261,14 +282,17 @@ public class FailoverManagedClusterTest extends TestCase {
|
|||
final byte[] bs = baos.toByteArray();
|
||||
|
||||
return new Xid() {
|
||||
@Override
|
||||
public int getFormatId() {
|
||||
return 86;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getGlobalTransactionId() {
|
||||
return bs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getBranchQualifier() {
|
||||
return bs;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue