mirror of https://github.com/apache/activemq.git
resolve AMQ-1977, some more choreography required in duplex case. Fix duplication of deleteAllMessageOnStartup which slows down JDBC test, add some more robustness to DuplexNetworkMBeanTest
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@704846 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6cd0ca9bd9
commit
543bc84a88
|
@ -1606,13 +1606,6 @@ public class BrokerService implements Service {
|
|||
* @throws Exception
|
||||
*/
|
||||
protected Broker createRegionBroker() throws Exception {
|
||||
// we must start the persistence adaptor before we can create the region
|
||||
// broker
|
||||
if (this.deleteAllMessagesOnStartup) {
|
||||
getPersistenceAdapter().deleteAllMessages();
|
||||
}
|
||||
// getPersistenceAdapter().start();
|
||||
|
||||
if (destinationInterceptors == null) {
|
||||
destinationInterceptors = createDefaultDestinationInterceptor();
|
||||
}
|
||||
|
|
|
@ -439,7 +439,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
if (!checkFoundStart && firstAckedMsg != null)
|
||||
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
|
||||
if (!checkFoundEnd && lastAckedMsg != null)
|
||||
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (end of ack)");
|
||||
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+lastAckedMsg+" in dispatched-list (end of ack)");
|
||||
if (ack.getMessageCount() != checkCount) {
|
||||
throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+
|
||||
") differs from count in dispatched-list ("+checkCount+")");
|
||||
|
|
|
@ -60,6 +60,7 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
|
|||
ServiceSupport.dispose(this);
|
||||
}
|
||||
}
|
||||
LOG.debug("counting down remoteBrokerNameKnownLatch with: " + command);
|
||||
remoteBrokerNameKnownLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,6 +214,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
|||
|
||||
localBroker.start();
|
||||
remoteBroker.start();
|
||||
if (configuration.isDuplex() && duplexInitiatingConnection == null) {
|
||||
// initiator side of duplex network
|
||||
remoteBrokerNameKnownLatch.await();
|
||||
}
|
||||
try {
|
||||
triggerRemoteStartBridge();
|
||||
} catch (IOException e) {
|
||||
|
@ -229,10 +233,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
|||
protected void triggerLocalStartBridge() throws IOException {
|
||||
ASYNC_TASKS.execute(new Runnable() {
|
||||
public void run() {
|
||||
final String originalName = Thread.currentThread().getName();
|
||||
Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
|
||||
try {
|
||||
startLocalBridge();
|
||||
} catch (Exception e) {
|
||||
serviceLocalException(e);
|
||||
} finally {
|
||||
Thread.currentThread().setName(originalName);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -241,10 +249,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
|||
protected void triggerRemoteStartBridge() throws IOException {
|
||||
ASYNC_TASKS.execute(new Runnable() {
|
||||
public void run() {
|
||||
final String originalName = Thread.currentThread().getName();
|
||||
Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
|
||||
try {
|
||||
startRemoteBridge();
|
||||
} catch (Exception e) {
|
||||
serviceRemoteException(e);
|
||||
} finally {
|
||||
Thread.currentThread().setName(originalName);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -253,7 +265,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
|||
protected void startLocalBridge() throws Exception {
|
||||
if (localBridgeStarted.compareAndSet(false, true)) {
|
||||
synchronized (this) {
|
||||
|
||||
LOG.debug("starting local Bridge, localBroker=" + localBroker);
|
||||
remoteBrokerNameKnownLatch.await();
|
||||
|
||||
localConnectionInfo = new ConnectionInfo();
|
||||
|
@ -278,6 +290,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
|||
|
||||
protected void startRemoteBridge() throws Exception {
|
||||
if (remoteBridgeStarted.compareAndSet(false, true)) {
|
||||
LOG.debug("starting remote Bridge, localBroker=" + localBroker);
|
||||
synchronized (this) {
|
||||
if (!isCreatedByDuplex()) {
|
||||
BrokerInfo brokerInfo = new BrokerInfo();
|
||||
|
@ -1025,7 +1038,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
|||
static {
|
||||
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, "NetworkBridge: "+runnable);
|
||||
Thread thread = new Thread(runnable, "NetworkBridge");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
public class DuplexNetworkMBeanTest extends TestCase {
|
||||
|
||||
protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class);
|
||||
protected final int numRestarts = 5;
|
||||
protected final int numRestarts = 3;
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
|
@ -93,6 +93,7 @@ public class DuplexNetworkMBeanTest extends TestCase {
|
|||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
assertEquals(0, countMbeans(broker, "stopped"));
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
//assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
|
||||
|
|
|
@ -105,7 +105,7 @@ public class SimpleNetworkTest extends TestCase {
|
|||
assertNotNull(includedConsumer.receive(1000));
|
||||
}
|
||||
|
||||
public void xtestConduitBridge() throws Exception {
|
||||
public void testConduitBridge() throws Exception {
|
||||
MessageConsumer consumer1 = remoteSession.createConsumer(included);
|
||||
MessageConsumer consumer2 = remoteSession.createConsumer(included);
|
||||
MessageProducer producer = localSession.createProducer(included);
|
||||
|
@ -122,7 +122,7 @@ public class SimpleNetworkTest extends TestCase {
|
|||
assertNull(consumer2.receive(500));
|
||||
}
|
||||
|
||||
public void xtestDurableStoreAndForward() throws Exception {
|
||||
public void testDurableStoreAndForward() throws Exception {
|
||||
// create a remote durable consumer
|
||||
MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName);
|
||||
Thread.sleep(1000);
|
||||
|
|
Loading…
Reference in New Issue