diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 1126d22e5e..555f0ec72f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -19,7 +19,11 @@ package org.apache.activemq.network; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.cert.X509Certificate; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -32,7 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.management.ObjectName; - import org.apache.activemq.DestinationDoesNotExistException; import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisoryBroker; @@ -96,14 +99,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected ActiveMQDestination[] durableDestinations; protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap(); protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); - protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; + protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; protected final CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch localStartedLatch = new CountDownLatch(1); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); protected NetworkBridgeConfiguration configuration; protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); - protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null }; + protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; protected BrokerId remoteBrokerId; final AtomicLong enqueueCounter = new AtomicLong(); @@ -251,7 +254,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serialExecutor.shutdown(); if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { List pendingTasks = serialExecutor.shutdownNow(); - LOG.info("pending tasks on stop {}", pendingTasks); + LOG.info("pending tasks on stop {}", pendingTasks); } localBroker.oneway(new ShutdownInfo()); remoteBroker.oneway(new ShutdownInfo()); @@ -292,7 +295,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + - "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); + "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); try { // First we collect the info data from both the local and remote ends @@ -335,11 +338,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{ configuration.getBrokerName(), remoteBrokerName, remoteBrokerId }); - ServiceSupport.dispose(localBroker); - ServiceSupport.dispose(remoteBroker); - // the bridge is left in a bit of limbo, but it won't get retried - // in this state. - return; + ServiceSupport.dispose(localBroker); + ServiceSupport.dispose(remoteBroker); + // the bridge is left in a bit of limbo, but it won't get retried + // in this state. + return; } // Fill in the remote broker's information now. @@ -431,7 +434,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" - + configuration.getBrokerName()); + + configuration.getBrokerName()); duplexLocalConnectionInfo.setUserName(configuration.getUserName()); duplexLocalConnectionInfo.setPassword(configuration.getPassword()); @@ -578,7 +581,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (command.isMessage()) { final ActiveMQMessage message = (ActiveMQMessage) command; if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) - || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { + || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { serviceRemoteConsumerAdvisory(message.getDataStructure()); ackAdvisory(message); } else { @@ -587,27 +590,35 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } // message being forwarded - we need to // propagate the response to our local send - message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); - if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { - duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { - final int correlationId = message.getCommandId(); + if (canDuplexDispatch(message)) { + message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); + if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { + duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { + final int correlationId = message.getCommandId(); - @Override - public void onCompletion(FutureResponse resp) { - try { - Response reply = resp.getResult(); - reply.setCorrelationId(correlationId); - remoteBroker.oneway(reply); - } catch (IOException error) { - LOG.error("Exception: {} on duplex forward of: {}", error, message); - serviceRemoteException(error); + @Override + public void onCompletion(FutureResponse resp) { + try { + Response reply = resp.getResult(); + reply.setCorrelationId(correlationId); + remoteBroker.oneway(reply); + } catch (IOException error) { + LOG.error("Exception: {} on duplex forward of: {}", error, message); + serviceRemoteException(error); + } } - } - }); + }); + } else { + duplexInboundLocalBroker.oneway(message); + } + serviceInboundMessage(message); } else { - duplexInboundLocalBroker.oneway(message); + if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { + Response reply = new Response(); + reply.setCorrelationId(message.getCommandId()); + remoteBroker.oneway(reply); + } } - serviceInboundMessage(message); } } else { switch (command.getDataStructureType()) { @@ -817,7 +828,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return; } - LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{ localBroker, remoteBroker, error }); + LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); LOG.debug("The local Exception was: {}", error, error); brokerService.getTaskRunnerFactory().execute(new Runnable() { @@ -844,7 +855,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, - advisoryMessage); + advisoryMessage); } } catch (Exception e) { @@ -871,7 +882,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void removeSubscription(final DemandSubscription sub) throws IOException { if (sub != null) { - LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{ configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId() }); + LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()}); // ensure not available for conduit subs pending removal subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); @@ -1049,7 +1060,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { if (brokerPath == null || brokerPath.length == 0) { - return new BrokerId[] { idToAppend }; + return new BrokerId[]{idToAppend}; } BrokerId rc[] = new BrokerId[brokerPath.length + 1]; System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); @@ -1156,7 +1167,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br boolean suppress = false; if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() - && !configuration.isSuppressDuplicateTopicSubscriptions()) { + && !configuration.isSuppressDuplicateTopicSubscriptions()) { return suppress; } @@ -1276,7 +1287,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br priority -= info.getBrokerPath().length + 1; } result.getLocalInfo().setPriority(priority); - LOG.debug("{} using priority: {} for subscription: {}", new Object[]{ configuration.getBrokerName(), priority, info }); + LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info}); } configureDemandSubscription(info, result); return result; @@ -1288,7 +1299,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br info.setDestination(destination); // Indicate that this subscription is being made on behalf of the remote broker. - info.setBrokerPath(new BrokerId[] { remoteBrokerId }); + info.setBrokerPath(new BrokerId[]{remoteBrokerId}); // the remote info held by the DemandSubscription holds the original // consumerId, the local info get's overwritten @@ -1352,8 +1363,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br /** * Performs a timed wait on the started latch and then checks for disposed * before performing another wait each time the the started wait times out. - * - * @throws InterruptedException */ protected void safeWaitUntilStarted() throws InterruptedException { while (!disposed.get()) { @@ -1403,7 +1412,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br /** * @param dynamicallyIncludedDestinations - * The dynamicallyIncludedDestinations to set. + * The dynamicallyIncludedDestinations to set. */ public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; @@ -1417,8 +1426,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } /** - * @param excludedDestinations - * The excludedDestinations to set. + * @param excludedDestinations The excludedDestinations to set. */ public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { this.excludedDestinations = excludedDestinations; @@ -1432,8 +1440,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } /** - * @param staticallyIncludedDestinations - * The staticallyIncludedDestinations to set. + * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. */ public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { this.staticallyIncludedDestinations = staticallyIncludedDestinations; @@ -1447,8 +1454,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } /** - * @param durableDestinations - * The durableDestinations to set. + * @param durableDestinations The durableDestinations to set. */ public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { this.durableDestinations = durableDestinations; @@ -1476,8 +1482,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } /** - * @param createdByDuplex - * the createdByDuplex to set + * @param createdByDuplex the createdByDuplex to set */ public void setCreatedByDuplex(boolean createdByDuplex) { this.createdByDuplex = createdByDuplex; @@ -1500,7 +1505,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br @Override public String getRemoteBrokerId() { - return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() ==null)? null : remoteBrokerInfo.getBrokerId().toString(); + return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString(); } @Override @@ -1543,7 +1548,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return mbeanObjectName; } - public void resetStats(){ + public void resetStats() { enqueueCounter.set(0); dequeueCounter.set(0); } @@ -1624,18 +1629,43 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - protected void serviceOutbound(Message message){ + protected void serviceOutbound(Message message) { NetworkBridgeListener l = this.networkBridgeListener; - if (l != null){ - l.onOutboundMessage(this,message); + if (l != null) { + l.onOutboundMessage(this, message); } } - protected void serviceInboundMessage(Message message){ + protected void serviceInboundMessage(Message message) { NetworkBridgeListener l = this.networkBridgeListener; - if (l != null){ - l.onInboundMessage(this,message); + if (l != null) { + l.onInboundMessage(this, message); } } + protected boolean canDuplexDispatch(Message message) { + boolean result = true; + if (configuration.isCheckDuplicateMessagesOnDuplex()){ + final long producerSequenceId = message.getMessageId().getProducerSequenceId(); + // messages are multiplexed on this producer so we need to query the persistenceAdapter + long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId()); + if (producerSequenceId <= lastStoredForMessageProducer) { + result = false; + LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ + (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer + }); + } + } + return result; + } + + protected long getStoredSequenceIdForMessage(MessageId messageId) { + try { + return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); + } catch (IOException ignored) { + LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); + } + return -1; + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index bfff94e350..3a59f3031c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -61,6 +61,7 @@ public class NetworkBridgeConfiguration { private boolean useBrokerNamesAsIdSeed = true; private boolean gcDestinationViews = true; private long gcSweepTime = 60 * 1000; + private boolean checkDuplicateMessagesOnDuplex = false; /** * @return the conduitSubscriptions @@ -440,4 +441,12 @@ public class NetworkBridgeConfiguration { this.gcSweepTime = gcSweepTime; } + public boolean isCheckDuplicateMessagesOnDuplex() { + return checkDuplicateMessagesOnDuplex; + } + + public void setCheckDuplicateMessagesOnDuplex(boolean checkDuplicateMessagesOnDuplex) { + this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex; + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java new file mode 100644 index 0000000000..68681c6ed2 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.io.File; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.net.ServerSocketFactory; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.command.Response; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.nio.NIOTransport; +import org.apache.activemq.transport.nio.NIOTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.wireformat.WireFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; + +/** + * + * @author x22koe + */ +public class CheckDuplicateMessagesOnDuplexTest { + + private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class); + private BrokerService localBroker; + private BrokerService remoteBroker; + private ActiveMQConnectionFactory localFactory; + private ActiveMQConnectionFactory remoteFactory; + private Session localSession; + private MessageConsumer consumer; + private Session remoteSession; + private MessageProducer producer; + private Connection remoteConnection; + private Connection localConnection; + private DebugTransportFilter debugTransportFilter; + private boolean useLevelDB = false; + + public CheckDuplicateMessagesOnDuplexTest() { + } + + @BeforeClass + public static void setUpClass() { + } + + @AfterClass + public static void tearDownClass() { + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + @Test + public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception { + createBrokers(); + localBroker.deleteAllMessages(); + remoteBroker.deleteAllMessages(); + startBrokers(); + openConnections(); + + Thread.sleep(1000); + log.info("\n\n==============================================\nsend hello1\n"); + + // simulate network failure between REMOTE and LOCAL just before the reception response is sent back to REMOTE + debugTransportFilter.closeOnResponse = true; + + producer.send(remoteSession.createTextMessage("hello1")); + Message msg = consumer.receive(30000); + + assertNotNull("expected hello1", msg); + assertEquals("hello1", ((TextMessage) msg).getText()); + + Thread.sleep(1000); + log.info("\n\n------------------------------------------\nsend hello2\n"); + + producer.send(remoteSession.createTextMessage("hello2")); + msg = consumer.receive(30000); + + assertNotNull("expected hello2", msg); + assertEquals("hello2", ((TextMessage) msg).getText()); + + closeLocalConnection(); + + Thread.sleep(1000); + log.info("\n\n------------------------------------------\nsend hello3\n"); + + openLocalConnection(); + + Thread.sleep(1000); + + producer.send(remoteSession.createTextMessage("hello3")); + msg = consumer.receive(30000); + + assertNotNull("expected hello3", msg); + assertEquals("hello3", ((TextMessage) msg).getText()); + + Thread.sleep(1000); + log.info("\n\n==============================================\n\n"); + + closeConnections(); + stopBrokers(); + + // restart the local broker, which should be empty + + Thread.sleep(1000); + log.info("\n\n##############################################\n\n"); + + createLocalBroker(); + startLocalBroker(); + openLocalConnection(); + + // this should not return the "hello1" message + msg = consumer.receive(1000); + + closeLocalConnection(); + stopLocalBroker(); + + assertNull(msg); + } + + private void createBrokers() throws Exception { + createLocalBroker(); + createRemoteBroker(); + } + + private void createLocalBroker() throws Exception { + localBroker = new BrokerService(); + localBroker.setBrokerName("LOCAL"); + localBroker.setUseJmx(true); + localBroker.setSchedulePeriodForDestinationPurge(5000); + ManagementContext managementContext = new ManagementContext(); + managementContext.setCreateConnector(false); + localBroker.setManagementContext(managementContext); + PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/local"); + localBroker.setPersistenceAdapter(persistenceAdapter); + List transportConnectors = new ArrayList(); + DebugTransportFactory tf = new DebugTransportFactory(); + TransportServer transport = tf.doBind(URI.create("nio://127.0.0.1:23539")); + TransportConnector transportConnector = new TransportConnector(transport); + transportConnector.setName("tc"); + transportConnector.setAuditNetworkProducers(true); + transportConnectors.add(transportConnector); + localBroker.setTransportConnectors(transportConnectors); + } + + private void createRemoteBroker() throws Exception { + remoteBroker = new BrokerService(); + remoteBroker.setBrokerName("REMOTE"); + remoteBroker.setUseJmx(true); + remoteBroker.setSchedulePeriodForDestinationPurge(5000); + ManagementContext managementContext = new ManagementContext(); + managementContext.setCreateConnector(false); + remoteBroker.setManagementContext(managementContext); + PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/remote"); + remoteBroker.setPersistenceAdapter(persistenceAdapter); + List networkConnectors = new ArrayList(); + DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector(); + networkConnector.setName("to local"); + // set maxInactivityDuration to 0, otherwise the broker restarts while you are in the debugger + networkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)")); + networkConnector.setDuplex(true); + //networkConnector.setNetworkTTL(5); + //networkConnector.setDynamicOnly(true); + networkConnector.setAlwaysSyncSend(true); + networkConnector.setDecreaseNetworkConsumerPriority(false); + networkConnector.setPrefetchSize(1); + networkConnector.setCheckDuplicateMessagesOnDuplex(true); + networkConnectors.add(networkConnector); + remoteBroker.setNetworkConnectors(networkConnectors); + } + + private void startBrokers() throws Exception { + startLocalBroker(); + startRemoteBroker(); + } + + private void startLocalBroker() throws Exception { + localBroker.start(); + localBroker.waitUntilStarted(); + } + + private void startRemoteBroker() throws Exception { + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + private void openConnections() throws JMSException { + openLocalConnection(); + openRemoteConnection(); + } + + private void openLocalConnection() throws JMSException { + localFactory = new ActiveMQConnectionFactory(localBroker.getVmConnectorURI()); + //localFactory.setSendAcksAsync(false); + localConnection = localFactory.createConnection(); + localConnection.start(); + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = localSession.createConsumer(localSession.createQueue("testqueue")); + } + + private void openRemoteConnection() throws JMSException { + remoteFactory = new ActiveMQConnectionFactory(remoteBroker.getVmConnectorURI()); + //remoteFactory.setSendAcksAsync(false); + remoteConnection = remoteFactory.createConnection(); + remoteConnection.start(); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = remoteSession.createProducer(remoteSession.createQueue("testqueue")); + } + + private void closeConnections() throws JMSException { + closeLocalConnection(); + closeRemoteConnection(); + } + + private void closeLocalConnection() throws JMSException { + localConnection.close(); + } + + private void closeRemoteConnection() throws JMSException { + remoteConnection.close(); + } + + private void stopBrokers() throws Exception { + stopRemoteBroker(); + stopLocalBroker(); + } + + private void stopLocalBroker() throws Exception { + localBroker.stop(); + localBroker.waitUntilStopped(); + } + + private void stopRemoteBroker() throws Exception { + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + } + + private PersistenceAdapter persistanceAdapterFactory(String path) { + if (useLevelDB) { + return persistanceAdapterFactory_LevelDB(path); + } else { + return persistanceAdapterFactory_KahaDB(path); + } + } + + private PersistenceAdapter persistanceAdapterFactory_KahaDB(String path) { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter(); + kahaDBPersistenceAdapter.setDirectory(new File(path)); + kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true); + kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true); + kahaDBPersistenceAdapter.setChecksumJournalFiles(true); + return kahaDBPersistenceAdapter; + } + + private PersistenceAdapter persistanceAdapterFactory_LevelDB(String path) { + LevelDBPersistenceAdapter levelDBPersistenceAdapter = new LevelDBPersistenceAdapter(); + levelDBPersistenceAdapter.setDirectory(new File(path)); + return levelDBPersistenceAdapter; + } + + private class DebugTransportFactory extends NIOTransportFactory { + + @Override + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) + throws IOException, URISyntaxException { + return new DebugTransportServer(this, location, serverSocketFactory); + } + } + + private class DebugTransportServer extends TcpTransportServer { + + public DebugTransportServer(TcpTransportFactory transportFactory, URI location, + ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory); + } + + @Override + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + Transport transport; + transport = new NIOTransport(format, socket); + debugTransportFilter = new DebugTransportFilter(transport); + return debugTransportFilter; + } + } + + private class DebugTransportFilter extends TransportFilter { + + boolean closeOnResponse = false; + + public DebugTransportFilter(Transport next) { + super(next); + } + + @Override + public void oneway(Object command) throws IOException { + if (closeOnResponse && command instanceof Response) { + closeOnResponse = false; + log.warn("\n\nclosing connection before response is sent\n\n"); + try { + ((NIOTransport) next).stop(); + } catch (Exception ex) { + log.error("couldn't stop niotransport", ex); + } + // don't send response + return; + } + super.oneway(command); + } + } +} \ No newline at end of file