This commit is contained in:
rajdavies 2013-12-11 11:13:23 +00:00
parent ff0dd5a917
commit 2bbfbcfb29
3 changed files with 451 additions and 56 deletions

View File

@ -19,7 +19,11 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -32,7 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
@ -96,14 +99,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
protected BrokerId remoteBrokerId;
final AtomicLong enqueueCounter = new AtomicLong();
@ -587,6 +590,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
// message being forwarded - we need to
// propagate the response to our local send
if (canDuplexDispatch(message)) {
message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
@ -608,6 +612,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
duplexInboundLocalBroker.oneway(message);
}
serviceInboundMessage(message);
} else {
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
Response reply = new Response();
reply.setCorrelationId(message.getCommandId());
remoteBroker.oneway(reply);
}
}
}
} else {
switch (command.getDataStructureType()) {
@ -817,7 +828,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return;
}
LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{ localBroker, remoteBroker, error });
LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
LOG.debug("The local Exception was: {}", error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@ -871,7 +882,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void removeSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{ configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId() });
LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
// ensure not available for conduit subs pending removal
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
@ -1049,7 +1060,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
if (brokerPath == null || brokerPath.length == 0) {
return new BrokerId[] { idToAppend };
return new BrokerId[]{idToAppend};
}
BrokerId rc[] = new BrokerId[brokerPath.length + 1];
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
@ -1276,7 +1287,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
priority -= info.getBrokerPath().length + 1;
}
result.getLocalInfo().setPriority(priority);
LOG.debug("{} using priority: {} for subscription: {}", new Object[]{ configuration.getBrokerName(), priority, info });
LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
}
configureDemandSubscription(info, result);
return result;
@ -1288,7 +1299,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
info.setDestination(destination);
// Indicate that this subscription is being made on behalf of the remote broker.
info.setBrokerPath(new BrokerId[] { remoteBrokerId });
info.setBrokerPath(new BrokerId[]{remoteBrokerId});
// the remote info held by the DemandSubscription holds the original
// consumerId, the local info get's overwritten
@ -1352,8 +1363,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
/**
* Performs a timed wait on the started latch and then checks for disposed
* before performing another wait each time the the started wait times out.
*
* @throws InterruptedException
*/
protected void safeWaitUntilStarted() throws InterruptedException {
while (!disposed.get()) {
@ -1417,8 +1426,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
* @param excludedDestinations
* The excludedDestinations to set.
* @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
this.excludedDestinations = excludedDestinations;
@ -1432,8 +1440,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
* @param staticallyIncludedDestinations
* The staticallyIncludedDestinations to set.
* @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
@ -1447,8 +1454,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
* @param durableDestinations
* The durableDestinations to set.
* @param durableDestinations The durableDestinations to set.
*/
public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
this.durableDestinations = durableDestinations;
@ -1476,8 +1482,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
* @param createdByDuplex
* the createdByDuplex to set
* @param createdByDuplex the createdByDuplex to set
*/
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
@ -1500,7 +1505,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
@Override
public String getRemoteBrokerId() {
return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() ==null)? null : remoteBrokerInfo.getBrokerId().toString();
return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
}
@Override
@ -1543,7 +1548,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return mbeanObjectName;
}
public void resetStats(){
public void resetStats() {
enqueueCounter.set(0);
dequeueCounter.set(0);
}
@ -1624,18 +1629,43 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected void serviceOutbound(Message message){
protected void serviceOutbound(Message message) {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null){
l.onOutboundMessage(this,message);
if (l != null) {
l.onOutboundMessage(this, message);
}
}
protected void serviceInboundMessage(Message message){
protected void serviceInboundMessage(Message message) {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null){
l.onInboundMessage(this,message);
if (l != null) {
l.onInboundMessage(this, message);
}
}
protected boolean canDuplexDispatch(Message message) {
boolean result = true;
if (configuration.isCheckDuplicateMessagesOnDuplex()){
final long producerSequenceId = message.getMessageId().getProducerSequenceId();
// messages are multiplexed on this producer so we need to query the persistenceAdapter
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
if (producerSequenceId <= lastStoredForMessageProducer) {
result = false;
LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
(LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
});
}
}
return result;
}
protected long getStoredSequenceIdForMessage(MessageId messageId) {
try {
return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
} catch (IOException ignored) {
LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
}
return -1;
}
}

View File

@ -61,6 +61,7 @@ public class NetworkBridgeConfiguration {
private boolean useBrokerNamesAsIdSeed = true;
private boolean gcDestinationViews = true;
private long gcSweepTime = 60 * 1000;
private boolean checkDuplicateMessagesOnDuplex = false;
/**
* @return the conduitSubscriptions
@ -440,4 +441,12 @@ public class NetworkBridgeConfiguration {
this.gcSweepTime = gcSweepTime;
}
public boolean isCheckDuplicateMessagesOnDuplex() {
return checkDuplicateMessagesOnDuplex;
}
public void setCheckDuplicateMessagesOnDuplex(boolean checkDuplicateMessagesOnDuplex) {
this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex;
}
}

View File

@ -0,0 +1,356 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.net.ServerSocketFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.Response;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.nio.NIOTransport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
/**
*
* @author x22koe
*/
public class CheckDuplicateMessagesOnDuplexTest {
private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class);
private BrokerService localBroker;
private BrokerService remoteBroker;
private ActiveMQConnectionFactory localFactory;
private ActiveMQConnectionFactory remoteFactory;
private Session localSession;
private MessageConsumer consumer;
private Session remoteSession;
private MessageProducer producer;
private Connection remoteConnection;
private Connection localConnection;
private DebugTransportFilter debugTransportFilter;
private boolean useLevelDB = false;
public CheckDuplicateMessagesOnDuplexTest() {
}
@BeforeClass
public static void setUpClass() {
}
@AfterClass
public static void tearDownClass() {
}
@Before
public void setUp() {
}
@After
public void tearDown() {
}
@Test
public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception {
createBrokers();
localBroker.deleteAllMessages();
remoteBroker.deleteAllMessages();
startBrokers();
openConnections();
Thread.sleep(1000);
log.info("\n\n==============================================\nsend hello1\n");
// simulate network failure between REMOTE and LOCAL just before the reception response is sent back to REMOTE
debugTransportFilter.closeOnResponse = true;
producer.send(remoteSession.createTextMessage("hello1"));
Message msg = consumer.receive(30000);
assertNotNull("expected hello1", msg);
assertEquals("hello1", ((TextMessage) msg).getText());
Thread.sleep(1000);
log.info("\n\n------------------------------------------\nsend hello2\n");
producer.send(remoteSession.createTextMessage("hello2"));
msg = consumer.receive(30000);
assertNotNull("expected hello2", msg);
assertEquals("hello2", ((TextMessage) msg).getText());
closeLocalConnection();
Thread.sleep(1000);
log.info("\n\n------------------------------------------\nsend hello3\n");
openLocalConnection();
Thread.sleep(1000);
producer.send(remoteSession.createTextMessage("hello3"));
msg = consumer.receive(30000);
assertNotNull("expected hello3", msg);
assertEquals("hello3", ((TextMessage) msg).getText());
Thread.sleep(1000);
log.info("\n\n==============================================\n\n");
closeConnections();
stopBrokers();
// restart the local broker, which should be empty
Thread.sleep(1000);
log.info("\n\n##############################################\n\n");
createLocalBroker();
startLocalBroker();
openLocalConnection();
// this should not return the "hello1" message
msg = consumer.receive(1000);
closeLocalConnection();
stopLocalBroker();
assertNull(msg);
}
private void createBrokers() throws Exception {
createLocalBroker();
createRemoteBroker();
}
private void createLocalBroker() throws Exception {
localBroker = new BrokerService();
localBroker.setBrokerName("LOCAL");
localBroker.setUseJmx(true);
localBroker.setSchedulePeriodForDestinationPurge(5000);
ManagementContext managementContext = new ManagementContext();
managementContext.setCreateConnector(false);
localBroker.setManagementContext(managementContext);
PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/local");
localBroker.setPersistenceAdapter(persistenceAdapter);
List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
DebugTransportFactory tf = new DebugTransportFactory();
TransportServer transport = tf.doBind(URI.create("nio://127.0.0.1:23539"));
TransportConnector transportConnector = new TransportConnector(transport);
transportConnector.setName("tc");
transportConnector.setAuditNetworkProducers(true);
transportConnectors.add(transportConnector);
localBroker.setTransportConnectors(transportConnectors);
}
private void createRemoteBroker() throws Exception {
remoteBroker = new BrokerService();
remoteBroker.setBrokerName("REMOTE");
remoteBroker.setUseJmx(true);
remoteBroker.setSchedulePeriodForDestinationPurge(5000);
ManagementContext managementContext = new ManagementContext();
managementContext.setCreateConnector(false);
remoteBroker.setManagementContext(managementContext);
PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/remote");
remoteBroker.setPersistenceAdapter(persistenceAdapter);
List<NetworkConnector> networkConnectors = new ArrayList<NetworkConnector>();
DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
networkConnector.setName("to local");
// set maxInactivityDuration to 0, otherwise the broker restarts while you are in the debugger
networkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)"));
networkConnector.setDuplex(true);
//networkConnector.setNetworkTTL(5);
//networkConnector.setDynamicOnly(true);
networkConnector.setAlwaysSyncSend(true);
networkConnector.setDecreaseNetworkConsumerPriority(false);
networkConnector.setPrefetchSize(1);
networkConnector.setCheckDuplicateMessagesOnDuplex(true);
networkConnectors.add(networkConnector);
remoteBroker.setNetworkConnectors(networkConnectors);
}
private void startBrokers() throws Exception {
startLocalBroker();
startRemoteBroker();
}
private void startLocalBroker() throws Exception {
localBroker.start();
localBroker.waitUntilStarted();
}
private void startRemoteBroker() throws Exception {
remoteBroker.start();
remoteBroker.waitUntilStarted();
}
private void openConnections() throws JMSException {
openLocalConnection();
openRemoteConnection();
}
private void openLocalConnection() throws JMSException {
localFactory = new ActiveMQConnectionFactory(localBroker.getVmConnectorURI());
//localFactory.setSendAcksAsync(false);
localConnection = localFactory.createConnection();
localConnection.start();
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = localSession.createConsumer(localSession.createQueue("testqueue"));
}
private void openRemoteConnection() throws JMSException {
remoteFactory = new ActiveMQConnectionFactory(remoteBroker.getVmConnectorURI());
//remoteFactory.setSendAcksAsync(false);
remoteConnection = remoteFactory.createConnection();
remoteConnection.start();
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = remoteSession.createProducer(remoteSession.createQueue("testqueue"));
}
private void closeConnections() throws JMSException {
closeLocalConnection();
closeRemoteConnection();
}
private void closeLocalConnection() throws JMSException {
localConnection.close();
}
private void closeRemoteConnection() throws JMSException {
remoteConnection.close();
}
private void stopBrokers() throws Exception {
stopRemoteBroker();
stopLocalBroker();
}
private void stopLocalBroker() throws Exception {
localBroker.stop();
localBroker.waitUntilStopped();
}
private void stopRemoteBroker() throws Exception {
remoteBroker.stop();
remoteBroker.waitUntilStopped();
}
private PersistenceAdapter persistanceAdapterFactory(String path) {
if (useLevelDB) {
return persistanceAdapterFactory_LevelDB(path);
} else {
return persistanceAdapterFactory_KahaDB(path);
}
}
private PersistenceAdapter persistanceAdapterFactory_KahaDB(String path) {
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(new File(path));
kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true);
kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true);
kahaDBPersistenceAdapter.setChecksumJournalFiles(true);
return kahaDBPersistenceAdapter;
}
private PersistenceAdapter persistanceAdapterFactory_LevelDB(String path) {
LevelDBPersistenceAdapter levelDBPersistenceAdapter = new LevelDBPersistenceAdapter();
levelDBPersistenceAdapter.setDirectory(new File(path));
return levelDBPersistenceAdapter;
}
private class DebugTransportFactory extends NIOTransportFactory {
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory)
throws IOException, URISyntaxException {
return new DebugTransportServer(this, location, serverSocketFactory);
}
}
private class DebugTransportServer extends TcpTransportServer {
public DebugTransportServer(TcpTransportFactory transportFactory, URI location,
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(transportFactory, location, serverSocketFactory);
}
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
Transport transport;
transport = new NIOTransport(format, socket);
debugTransportFilter = new DebugTransportFilter(transport);
return debugTransportFilter;
}
}
private class DebugTransportFilter extends TransportFilter {
boolean closeOnResponse = false;
public DebugTransportFilter(Transport next) {
super(next);
}
@Override
public void oneway(Object command) throws IOException {
if (closeOnResponse && command instanceof Response) {
closeOnResponse = false;
log.warn("\n\nclosing connection before response is sent\n\n");
try {
((NIOTransport) next).stop();
} catch (Exception ex) {
log.error("couldn't stop niotransport", ex);
}
// don't send response
return;
}
super.oneway(command);
}
}
}