https://issues.apache.org/jira/browse/AMQ-4276 - deal with destnotexist for temps. support alwaysSyncSend for duplex. improve reporting of failures. additional test.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1438734 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-01-25 21:47:17 +00:00
parent 43e2083f41
commit 8012f28511
17 changed files with 808 additions and 110 deletions

View File

@ -518,7 +518,7 @@ public class AdvisoryBroker extends BrokerFilter {
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
} }
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
if (getBrokerService().isStarted()) { if (getBrokerService().isStarted()) {
//set properties //set properties
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());

View File

@ -291,7 +291,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// Record the error that caused the transport to stop // Record the error that caused the transport to stop
this.stopError = e; this.stopError = e;
// Wait a little bit to try to get the output buffer to flush // Wait a little bit to try to get the output buffer to flush
// the exption notification to the client. // the exception notification to the client.
try { try {
Thread.sleep(500); Thread.sleep(500);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -1326,12 +1326,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
setDuplexNetworkConnectorId(duplexNetworkConnectorId); setDuplexNetworkConnectorId(duplexNetworkConnectorId);
} }
URI uri = broker.getVmConnectorURI(); Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport); Transport remoteBridgeTransport = new ResponseCorrelator(transport);
String duplexName = localTransport.toString(); String duplexName = localTransport.toString();
if (duplexName.contains("#")) { if (duplexName.contains("#")) {

View File

@ -28,15 +28,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException; import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
@ -147,7 +145,7 @@ public abstract class AbstractRegion implements Region {
addSubscriptionsForDestination(context, dest); addSubscriptionsForDestination(context, dest);
} }
if (dest == null) { if (dest == null) {
throw new JMSException("The destination " + destination + " does not exist."); throw new DestinationDoesNotExistException(destination.getQualifiedName());
} }
} }
return dest; return dest;
@ -451,13 +449,8 @@ public abstract class AbstractRegion implements Region {
// Try to auto create the destination... re-invoke broker // Try to auto create the destination... re-invoke broker
// from the // from the
// top so that the proper security checks are performed. // top so that the proper security checks are performed.
try { context.getBroker().addDestination(context, destination, createTemporary);
context.getBroker().addDestination(context, destination, createTemporary); dest = addDestination(context, destination, false);
dest = addDestination(context, destination, false);
} catch (DestinationAlreadyExistsException e) {
// if the destination already exists then lets ignore
// this error
}
// We should now have the dest created. // We should now have the dest created.
destinationsLock.readLock().lock(); destinationsLock.readLock().lock();
try { try {

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext;
/**
* Priority dispatch policy that sends a message to every subscription that
* matches the message in consumer priority order.
*
* @org.apache.xbean.XBean
*
*/
public class PriorityDispatchPolicy extends SimpleDispatchPolicy {
private final Comparator<? super Subscription> orderedCompare = new Comparator<Subscription>() {
@Override
public int compare(Subscription o1, Subscription o2) {
// We want the list sorted in descending order
return o2.getConsumerInfo().getPriority() - o1.getConsumerInfo().getPriority();
}
};
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
throws Exception {
ArrayList<Subscription> ordered = new ArrayList<Subscription>(consumers);
Collections.sort(ordered, orderedCompare);
StringBuffer stringBuffer = new StringBuffer();
for (Subscription sub: ordered) {
stringBuffer.append(sub.getConsumerInfo().getPriority());
stringBuffer.append(',');
}
//System.err.println("Priority:" + stringBuffer.toString() + ", msg: " + node.getMessage());
return super.dispatch(node, msgContext, ordered);
}
}

View File

@ -28,6 +28,6 @@ import org.apache.activemq.command.NetworkBridgeFilter;
*/ */
public class DefaultNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory { public class DefaultNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) { public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
return new NetworkBridgeFilter(remoteBrokerPath[0], networkTimeToLive); return new NetworkBridgeFilter(info, remoteBrokerPath[0], networkTimeToLive);
} }
} }

View File

@ -34,9 +34,12 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.AbstractRegion; import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.DurableTopicSubscription;
@ -72,11 +75,13 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
@ -140,6 +145,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private BrokerService brokerService = null; private BrokerService brokerService = null;
private ObjectName mbeanObjectName; private ObjectName mbeanObjectName;
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
private Transport duplexInboundLocalBroker = null;
private ProducerInfo duplexInboundLocalProducerInfo;
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration = configuration; this.configuration = configuration;
@ -163,6 +170,24 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
throw new IllegalArgumentException("BrokerService is null on " + this); throw new IllegalArgumentException("BrokerService is null on " + this);
} }
if (isDuplex()) {
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
@Override
public void onException(IOException error) {
serviceLocalException(error);
}
});
duplexInboundLocalBroker.start();
}
localBroker.setTransportListener(new DefaultTransportListener() { localBroker.setTransportListener(new DefaultTransportListener() {
@Override @Override
@ -269,6 +294,28 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localSessionInfo = new SessionInfo(localConnectionInfo, 1); localSessionInfo = new SessionInfo(localConnectionInfo, 1);
localBroker.oneway(localSessionInfo); localBroker.oneway(localSessionInfo);
if (configuration.isDuplex()) {
// separate inbound chanel for forwards so we don't contend with outbound dispatch on same connection
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" + configuration.getBrokerName());
duplexLocalConnectionInfo.setUserName(configuration.getUserName());
duplexLocalConnectionInfo.setPassword(configuration.getPassword());
if (originalTransport instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
duplexLocalConnectionInfo.setTransportContext(peerCerts);
}
// sync requests that may fail
resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
if (resp instanceof ExceptionResponse) {
throw ((ExceptionResponse)resp).getException();
}
SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
duplexInboundLocalBroker.oneway(duplexInboundSession);
duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
}
brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
NetworkBridgeListener l = this.networkBridgeListener; NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) { if (l != null) {
@ -388,6 +435,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ServiceStopper ss = new ServiceStopper(); ServiceStopper ss = new ServiceStopper();
ss.stop(remoteBroker); ss.stop(remoteBroker);
ss.stop(localBroker); ss.stop(localBroker);
ss.stop(duplexInboundLocalBroker);
// Release the started Latch since another thread could be // Release the started Latch since another thread could be
// stuck waiting for it to start up. // stuck waiting for it to start up.
startedLatch.countDown(); startedLatch.countDown();
@ -466,8 +514,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serviceRemoteException(ce.getException()); serviceRemoteException(ce.getException());
} else { } else {
if (isDuplex()) { if (isDuplex()) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " duplex command type: "+ command.getCommandId());
}
if (command.isMessage()) { if (command.isMessage()) {
ActiveMQMessage message = (ActiveMQMessage) command; final ActiveMQMessage message = (ActiveMQMessage) command;
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
|| AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure()); serviceRemoteConsumerAdvisory(message.getDataStructure());
@ -476,69 +527,83 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (!isPermissableDestination(message.getDestination(), true)) { if (!isPermissableDestination(message.getDestination(), true)) {
return; return;
} }
if (message.isResponseRequired()) { // message being forwarded - we need to propagate the response to our local send
Response reply = new Response(); message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
reply.setCorrelationId(message.getCommandId()); if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
localBroker.oneway(message); duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
remoteBroker.oneway(reply); final int correlationId = message.getCommandId();
@Override
public void onCompletion(FutureResponse resp) {
try {
Response reply = resp.getResult();
reply.setCorrelationId(correlationId);
remoteBroker.oneway(reply);
} catch (IOException error) {
LOG.error("Exception: " + error + " on duplex forward of: " + message);
serviceRemoteException(error);
}
}
});
} else { } else {
localBroker.oneway(message); duplexInboundLocalBroker.oneway(message);
} }
} }
} else { } else {
switch (command.getDataStructureType()) { switch (command.getDataStructureType()) {
case ConnectionInfo.DATA_STRUCTURE_TYPE: case ConnectionInfo.DATA_STRUCTURE_TYPE:
case SessionInfo.DATA_STRUCTURE_TYPE: case SessionInfo.DATA_STRUCTURE_TYPE:
case ProducerInfo.DATA_STRUCTURE_TYPE: localBroker.oneway(command);
localBroker.oneway(command); break;
break; case ProducerInfo.DATA_STRUCTURE_TYPE:
case MessageAck.DATA_STRUCTURE_TYPE: // using duplexInboundLocalProducerInfo
MessageAck ack = (MessageAck) command; break;
DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); case MessageAck.DATA_STRUCTURE_TYPE:
if (localSub != null) { MessageAck ack = (MessageAck) command;
ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
localBroker.oneway(ack); if (localSub != null) {
} else { ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
LOG.warn("Matching local subscription not found for ack: " + ack); localBroker.oneway(ack);
} } else {
break; LOG.warn("Matching local subscription not found for ack: " + ack);
case ConsumerInfo.DATA_STRUCTURE_TYPE: }
localStartedLatch.await(); break;
if (started.get()) { case ConsumerInfo.DATA_STRUCTURE_TYPE:
if (!addConsumerInfo((ConsumerInfo) command)) { localStartedLatch.await();
if (LOG.isDebugEnabled()) { if (started.get()) {
LOG.debug("Ignoring ConsumerInfo: " + command); if (!addConsumerInfo((ConsumerInfo) command)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring ConsumerInfo: " + command);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding ConsumerInfo: " + command);
}
} }
} else { } else {
if (LOG.isTraceEnabled()) { // received a subscription whilst stopping
LOG.trace("Adding ConsumerInfo: " + command); LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
} }
break;
case ShutdownInfo.DATA_STRUCTURE_TYPE:
// initiator is shutting down, controlled case
// abortive close dealt with by inactivity monitor
LOG.info("Stopping network bridge on shutdown of remote broker");
serviceRemoteException(new IOException(command.toString()));
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring remote command: " + command);
} }
} else {
// received a subscription whilst stopping
LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
}
break;
case ShutdownInfo.DATA_STRUCTURE_TYPE:
// initiator is shutting down, controlled case
// abortive close dealt with by inactivity monitor
LOG.info("Stopping network bridge on shutdown of remote broker");
serviceRemoteException(new IOException(command.toString()));
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring remote command: " + command);
}
} }
} }
} else { } else {
switch (command.getDataStructureType()) { switch (command.getDataStructureType()) {
case KeepAliveInfo.DATA_STRUCTURE_TYPE: case KeepAliveInfo.DATA_STRUCTURE_TYPE:
case WireFormatInfo.DATA_STRUCTURE_TYPE: case WireFormatInfo.DATA_STRUCTURE_TYPE:
case ShutdownInfo.DATA_STRUCTURE_TYPE: case ShutdownInfo.DATA_STRUCTURE_TYPE:
break; break;
default: default:
LOG.warn("Unexpected remote command: " + command); LOG.warn("Unexpected remote command: " + command);
} }
} }
} }
@ -659,7 +724,29 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
@Override @Override
public void serviceLocalException(Throwable error) { public void serviceLocalException(Throwable error) {
serviceLocalException(null, error);
}
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
if (!disposed.get()) { if (!disposed.get()) {
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException)error).isTemporary() ) {
// not a reason to terminate the bridge - temps can disappear with pending sends as the demand sub may outlive the remote dest
if (messageDispatch != null) {
LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error);
try {
MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setPoisonCause(error);
localBroker.oneway(poisonAck);
} catch (IOException ioe) {
LOG.error("Failed to posion ack message following forward failure: " + ioe, ioe);
}
fireFailedForwardAdvisory(messageDispatch, error);
} else {
LOG.warn("Ignoring exception on forwarding to non existent temp dest: " + error, error);
}
return;
}
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error); LOG.debug("The local Exception was:" + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() { brokerService.getTaskRunnerFactory().execute(new Runnable() {
@ -672,6 +759,33 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} }
private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
if (configuration.isAdvisoryForFailedForward()) {
AdvisoryBroker advisoryBroker = null;
try {
advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
if (advisoryBroker != null) {
ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(brokerService.getBroker());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
advisoryBroker.fireAdvisory(context,
AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(),
messageDispatch.getMessage(), null, advisoryMessage);
}
} catch (Exception e) {
LOG.warn("failed to fire forward failure advisory, cause: " + e);
if (LOG.isDebugEnabled()) {
LOG.debug("detail", e);
}
}
}
}
protected Service getControllingService() { protected Service getControllingService() {
return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
} }
@ -684,11 +798,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void removeSubscription(final DemandSubscription sub) throws IOException { protected void removeSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) { if (sub != null) {
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); LOG.trace(configuration.getBrokerName() + " remove local subscription:"
+ sub.getLocalInfo().getConsumerId()
+ " for remote " + sub.getRemoteInfo().getConsumerId());
} }
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses // continue removal in separate thread to free up this thread for outstanding responses
// serialise with removeDestination operations so that removeSubs are serialised with removeDestinations // serialise with removeDestination operations so that removeSubs are serialised with removeDestinations
@ -701,6 +815,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
} catch (IOException e) { } catch (IOException e) {
LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e); LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
} finally {
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
} }
} }
}); });
@ -778,13 +895,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Response response = future.getResult(); Response response = future.getResult();
if (response.isException()) { if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response; ExceptionResponse er = (ExceptionResponse) response;
serviceLocalException(er.getException()); serviceLocalException(md, er.getException());
} else { } else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet(); dequeueCounter.incrementAndGet();
} }
} catch (IOException e) { } catch (IOException e) {
serviceLocalException(e); serviceLocalException(md, e);
} finally { } finally {
sub.decrementOutstandingResponses(); sub.decrementOutstandingResponses();
} }
@ -1195,6 +1312,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
ConsumerInfo info = new ConsumerInfo(); ConsumerInfo info = new ConsumerInfo();
info.setNetworkSubscription(true);
info.setDestination(destination); info.setDestination(destination);
// Indicate that this subscription is being made on behalf of the remote broker. // Indicate that this subscription is being made on behalf of the remote broker.

View File

@ -57,6 +57,7 @@ public class NetworkBridgeConfiguration {
private boolean alwaysSyncSend = false; private boolean alwaysSyncSend = false;
private boolean staticBridge = false; private boolean staticBridge = false;
private boolean useCompression = false; private boolean useCompression = false;
private boolean advisoryForFailedForward = false;
/** /**
* @return the conduitSubscriptions * @return the conduitSubscriptions
@ -385,4 +386,12 @@ public class NetworkBridgeConfiguration {
public boolean isUseCompression() { public boolean isUseCompression() {
return useCompression; return useCompression;
} }
public boolean isAdvisoryForFailedForward() {
return advisoryForFailedForward;
}
public void setAdvisoryForFailedForward(boolean advisoryForFailedForward) {
this.advisoryForFailedForward = advisoryForFailedForward;
}
} }

View File

@ -16,7 +16,12 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import java.net.URI;
import java.util.HashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.URISupport;
/** /**
* Factory for network bridges * Factory for network bridges
@ -65,4 +70,13 @@ public final class NetworkBridgeFactory {
} }
return result; return result;
} }
public static Transport createLocalTransport(Broker broker) throws Exception {
URI uri = broker.getVmConnectorURI();
HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
return TransportFactory.connect(uri);
}
} }

View File

@ -14,26 +14,27 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker; package org.apache.activemq;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
/** /**
* An exception thrown if a destination is attempted to be created when it already exists. * An exception thrown on a send if a destination does not exist.
* * Allows a network bridge to easily cherry-pick and ignore
* *
*/ */
public class DestinationAlreadyExistsException extends JMSException { public class DestinationDoesNotExistException extends JMSException {
private final ActiveMQDestination destination;
public DestinationAlreadyExistsException(ActiveMQDestination destination) { public DestinationDoesNotExistException(String destination) {
super("Destination already exists: " + destination); super(destination);
this.destination = destination;
} }
public ActiveMQDestination getDestination() { public boolean isTemporary() {
return destination; return getMessage().startsWith("temp-");
}
@Override
public String getLocalizedMessage() {
return "The destination " + getMessage() + " does not exist.";
} }
} }

View File

@ -51,6 +51,7 @@ public final class AdvisorySupport {
public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd."; public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker"; public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge"; public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
public static final String NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX = NETWORK_BRIDGE_TOPIC_PREFIX + ".ForwardFailure";
public static final String AGENT_TOPIC = "ActiveMQ.Agent"; public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId"; public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
@ -528,4 +529,8 @@ public final class AdvisorySupport {
public static Destination getAgentDestination() { public static Destination getAgentDestination() {
return AGENT_TOPIC_DESTINATION; return AGENT_TOPIC_DESTINATION;
} }
public static ActiveMQTopic getNetworkBridgeForwardFailureAdvisoryTopic() {
return new ActiveMQTopic(NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX);
}
} }

View File

@ -37,13 +37,15 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
protected BrokerId networkBrokerId; protected BrokerId networkBrokerId;
protected int networkTTL; protected int networkTTL;
transient ConsumerInfo consumerInfo;
public NetworkBridgeFilter() { public NetworkBridgeFilter() {
} }
public NetworkBridgeFilter(BrokerId networkBrokerId, int networkTTL) { public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int networkTTL) {
this.networkBrokerId = networkBrokerId; this.networkBrokerId = networkBrokerId;
this.networkTTL = networkTTL; this.networkTTL = networkTTL;
this.consumerInfo = consumerInfo;
} }
public byte getDataStructureType() { public byte getDataStructureType() {
@ -91,21 +93,29 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
return false; return false;
} }
if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) { if (message.isAdvisory()) {
ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); if (consumerInfo != null && consumerInfo.isNetworkSubscription()) {
hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length; // they will be interpreted by the bridge leading to dup commands
if (hops >= networkTTL) { //if (LOG.isTraceEnabled()) {
if (LOG.isTraceEnabled()) { LOG.error("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message); //}
return false;
} else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
if (hops >= networkTTL) {
if (LOG.isTraceEnabled()) {
LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
}
return false;
} }
return false;
}
if (contains(info.getBrokerPath(), networkBrokerId)) { if (contains(info.getBrokerPath(), networkBrokerId)) {
LOG.trace("ConsumerInfo advisory all ready routed once through target broker (" LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
+ networkBrokerId + "), path: " + networkBrokerId + "), path: "
+ Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message); + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
return false; return false;
}
} }
} }
return true; return true;

View File

@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
/** /**
* Adds the incrementing sequence number to commands along with performing the * Adds the incrementing sequence number to commands along with performing the
* corelation of responses to requests to create a blocking request-response * correlation of responses to requests to create a blocking request-response
* semantics. * semantics.
* *
* *

View File

@ -19,10 +19,12 @@ package org.apache.activemq.network;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.junit.Test; import org.junit.Test;
import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
public class DuplexNetworkTest extends SimpleNetworkTest { public class DuplexNetworkTest extends SimpleNetworkTest {
@ -47,7 +49,12 @@ public class DuplexNetworkTest extends SimpleNetworkTest {
Thread.sleep(100); Thread.sleep(100);
assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length); assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length);
temp.delete(); temp.delete();
Thread.sleep(100);
assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length); assertTrue("Destination not deleted", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == remoteBroker.getAdminView().getTemporaryQueues().length;
}
}));
} }
} }

View File

@ -72,6 +72,8 @@ public class SimpleNetworkTest {
protected ActiveMQTopic excluded; protected ActiveMQTopic excluded;
protected String consumerName = "durableSubs"; protected String consumerName = "durableSubs";
// works b/c of non marshaling vm transport, the connection
// ref from the client is used during the forward
@Test @Test
public void testMessageCompression() throws Exception { public void testMessageCompression() throws Exception {

View File

@ -323,7 +323,7 @@ public abstract class DataFileGeneratorTestSupport extends TestSupport {
} }
protected BooleanExpression createBooleanExpression(String string) { protected BooleanExpression createBooleanExpression(String string) {
return new NetworkBridgeFilter(new BrokerId(string), 10); return new NetworkBridgeFilter(null, new BrokerId(string), 10);
} }
} }

View File

@ -137,12 +137,12 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
long numVmConnections = VMTransportFactory.SERVERS.get(HUB).getConnectionCount(); long numVmConnections = VMTransportFactory.SERVERS.get(HUB).getConnectionCount();
LOG.info("Num VM connetions:" + numVmConnections); LOG.info("Num VM connetions:" + numVmConnections);
return numVmConnections == 1; return numVmConnections == 2;
}}); }});
if (!allGood) { if (!allGood) {
dumpAllThreads("ExtraHubVMConnection"); dumpAllThreads("ExtraHubVMConnection");
} }
assertTrue("should be only one vm connection for the single network duplex network connector", allGood); assertTrue("should be only 2 vm connections for the single network duplex network connector", allGood);
} }
public void testTwoDuplexNCsAreAllowed() throws Exception { public void testTwoDuplexNCsAreAllowed() throws Exception {

View File

@ -0,0 +1,486 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.PriorityDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.network.DemandForwardingBridgeSupport;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
*/
public class RequestReplyTempDestRemovalAdvisoryRaceTest extends JmsMultipleBrokersTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
private static final String BROKER_A = "BrokerA";
private static final String BROKER_B = "BrokerB";
private static final String BROKER_C = "BrokerC";
private static final int NUM_RESPONDENTS = 1;
private static final int NUM_SENDS = 1;
private static final int RANDOM_SLEEP_FOR_RESPONDENT_MS = 0;
private static final int RANDOM_SLEEP_FOR_SENDER_MS = 1;
private static final String QUEUE_NAME = "foo.queue";
private static String[] TEST_ITERATIONS = new String[]{QUEUE_NAME+"0", QUEUE_NAME+"1", QUEUE_NAME+"2", QUEUE_NAME+"3"};
final AtomicLong messageCount = new AtomicLong(0);
final AtomicLong respondentSendError = new AtomicLong(0);
final AtomicLong responseReceived = new AtomicLong(0);
final AtomicLong sendsWithNoConsumers = new AtomicLong(0);
final AtomicLong forwardFailures = new AtomicLong(0);
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
HashSet<NetworkConnector> networkConnectors = new HashSet<NetworkConnector>();
HashSet<Connection> advisoryConsumerConnections = new HashSet<Connection>();
Appender slowDownAppender;
CountDownLatch consumerDemandExists;
protected boolean useDuplex = false;
public static Test suite() {
return suite(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
}
/**
* Notes: to reliably reproduce use debugger... set a "thread" breakpoint at line 679 in DemandForwardingBridgeSupport,
* and only break on the "2nd" pass (broker C's bridge). Allow debugging to continue shortly after hitting
* the breakpoint, for this test we use a logging appender to implement the pause,
* it fails most of the time, hence the combos
*/
public void initCombos() {
addCombinationValues("QUEUE_NAME", TEST_ITERATIONS);
}
public void testTempDestRaceDuplex() throws Exception {
// duplex
useDuplex = true;
bridgeBrokers(BROKER_A, BROKER_B, false, 3);
bridgeBrokers(BROKER_B, BROKER_C, false, 3);
startAllBrokers();
waitForBridgeFormation(1);
HashSet<NetworkBridge> bridgesStart = new HashSet<NetworkBridge>();
for (NetworkConnector networkConnector : networkConnectors) {
bridgesStart.addAll(networkConnector.activeBridges());
}
LOG.info("Bridges start:" + bridgesStart);
slowDownAdvisoryDispatch();
noConsumerAdvisory();
forwardFailureAdvisory();
// set up respondents
ExecutorService respondentThreadPool = Executors.newFixedThreadPool(50);
BrokerItem brokerA = brokers.get(BROKER_A);
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.broker.getTransportConnectorByScheme("tcp").getName()
+ "?jms.watchTopicAdvisories=false");
brokerAFactory.setAlwaysSyncSend(true);
for (int i = 0; i < NUM_RESPONDENTS; i++) {
respondentThreadPool.execute(new EchoRespondent(brokerAFactory));
}
// fire off sends
ExecutorService senderThreadPool = Executors.newCachedThreadPool();
BrokerItem brokerC = brokers.get(BROKER_C);
ActiveMQConnectionFactory brokerCFactory = new ActiveMQConnectionFactory(brokerC.broker.getTransportConnectorByScheme("tcp").getName()
+ "?jms.watchTopicAdvisories=false");
for (int i = 0; i < NUM_SENDS; i++) {
senderThreadPool.execute(new MessageSender(brokerCFactory));
}
senderThreadPool.shutdown();
senderThreadPool.awaitTermination(30, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(15);
LOG.info("shutting down");
shutdown.compareAndSet(false, true);
HashSet<NetworkBridge> bridgesEnd = new HashSet<NetworkBridge>();
for (NetworkConnector networkConnector : networkConnectors) {
bridgesEnd.addAll(networkConnector.activeBridges());
}
LOG.info("Bridges end:" + bridgesEnd);
assertEquals("no new bridges created", bridgesStart, bridgesEnd);
// validate success or error of dlq
LOG.info("received: " + responseReceived.get() + ", respondent error: " + respondentSendError.get()
+ ", noConsumerCount: " + sendsWithNoConsumers.get()
+ ", forwardFailures: " + forwardFailures.get());
assertEquals("success or error", NUM_SENDS, respondentSendError.get() + forwardFailures.get()
+ responseReceived.get() + sendsWithNoConsumers.get());
}
private void forwardFailureAdvisory() throws JMSException {
for (BrokerItem item : brokers.values()) {
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName()
+ "?jms.watchTopicAdvisories=false");
Connection connection = brokerAFactory.createConnection();
connection.start();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(
AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic()).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
forwardFailures.incrementAndGet();
}
});
}
}
private void noConsumerAdvisory() throws JMSException {
for (BrokerItem item : brokers.values()) {
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName()
+ "?jms.watchTopicAdvisories=false");
Connection connection = brokerAFactory.createConnection();
connection.start();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(new ActiveMQTempTopic(">"))).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
sendsWithNoConsumers.incrementAndGet();
}
});
}
}
public void testTempDestRace() throws Exception {
// non duplex
bridgeBrokers(BROKER_A, BROKER_B, false, 3);
bridgeBrokers(BROKER_B, BROKER_A, false, 3);
bridgeBrokers(BROKER_B, BROKER_C, false, 3);
bridgeBrokers(BROKER_C, BROKER_B, false, 3);
startAllBrokers();
waitForBridgeFormation(1);
HashSet<NetworkBridge> bridgesStart = new HashSet<NetworkBridge>();
for (NetworkConnector networkConnector : networkConnectors) {
bridgesStart.addAll(networkConnector.activeBridges());
}
slowDownAdvisoryDispatch();
noConsumerAdvisory();
forwardFailureAdvisory();
// set up respondents
ExecutorService respondentThreadPool = Executors.newFixedThreadPool(50);
BrokerItem brokerA = brokers.get(BROKER_A);
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.broker.getTransportConnectorByScheme("tcp").getName()
+ "?jms.watchTopicAdvisories=false");
brokerAFactory.setAlwaysSyncSend(true);
for (int i = 0; i < NUM_RESPONDENTS; i++) {
respondentThreadPool.execute(new EchoRespondent(brokerAFactory));
}
// fire off sends
ExecutorService senderThreadPool = Executors.newCachedThreadPool();
BrokerItem brokerC = brokers.get(BROKER_C);
ActiveMQConnectionFactory brokerCFactory = new ActiveMQConnectionFactory(brokerC.broker.getTransportConnectorByScheme("tcp").getName()
+ "?jms.watchTopicAdvisories=false");
for (int i = 0; i < NUM_SENDS; i++) {
senderThreadPool.execute(new MessageSender(brokerCFactory));
}
senderThreadPool.shutdown();
senderThreadPool.awaitTermination(30, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
LOG.info("shutting down");
shutdown.compareAndSet(false, true);
HashSet<NetworkBridge> bridgesEnd = new HashSet<NetworkBridge>();
for (NetworkConnector networkConnector : networkConnectors) {
bridgesEnd.addAll(networkConnector.activeBridges());
}
assertEquals("no new bridges created", bridgesStart, bridgesEnd);
// validate success or error or dlq
LOG.info("received: " + responseReceived.get() + ", respondent error: " + respondentSendError.get()
+ ", noConsumerCount: " + sendsWithNoConsumers.get()
+ ", forwardFailures: " + forwardFailures.get());
assertEquals("success or error", NUM_SENDS, respondentSendError.get() + forwardFailures.get()
+ responseReceived.get() + sendsWithNoConsumers.get());
}
private void slowDownAdvisoryDispatch() throws Exception {
org.apache.log4j.Logger.getLogger(DemandForwardingBridgeSupport.class).setLevel(Level.DEBUG);
// instrument a logger to block the processing of a remove sub advisory
// simulate a slow thread
slowDownAppender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent loggingEvent) {
if (Level.DEBUG.equals(loggingEvent.getLevel())) {
String message = loggingEvent.getMessage().toString();
if (message.startsWith("BrokerB") && message.contains("remove local subscription")) {
// sleep for a bit
try {
consumerDemandExists.countDown();
System.err.println("Sleeping on receipt of remove info debug message: " + message);
TimeUnit.SECONDS.sleep(2);
} catch (Exception ignored) {
}
}
}
}
};
org.apache.log4j.Logger.getRootLogger().addAppender(slowDownAppender);
}
@Override
protected void setUp() throws Exception {
super.setUp();
responseReceived.set(0);
respondentSendError.set(0);
forwardFailures.set(0);
sendsWithNoConsumers.set(0);
networkConnectors.clear();
advisoryConsumerConnections.clear();
consumerDemandExists = new CountDownLatch(1);
createBroker(new URI("broker:(tcp://localhost:0)/" + BROKER_A + "?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
createBroker(new URI("broker:(tcp://localhost:0)/" + BROKER_B + "?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
createBroker(new URI("broker:(tcp://localhost:0)/" + BROKER_C + "?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
PolicyMap map = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setSendAdvisoryIfNoConsumers(true);
DeadLetterStrategy deadletterStrategy = new SharedDeadLetterStrategy();
deadletterStrategy.setProcessNonPersistent(true);
defaultEntry.setDeadLetterStrategy(deadletterStrategy);
defaultEntry.setDispatchPolicy(new PriorityDispatchPolicy());
map.put(new ActiveMQTempTopic(">"), defaultEntry);
for (BrokerItem item : brokers.values()) {
item.broker.setDestinationPolicy(map);
}
}
@Override
protected void tearDown() throws Exception {
if (slowDownAppender != null) {
org.apache.log4j.Logger.getRootLogger().removeAppender(slowDownAppender);
}
for (Connection connection : advisoryConsumerConnections) {
connection.close();
}
super.tearDown();
}
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL, true);
connector.setBridgeTempDestinations(true);
connector.setAdvisoryForFailedForward(true);
connector.setDuplex(useDuplex);
connector.setAlwaysSyncSend(true);
networkConnectors.add(connector);
return connector;
}
abstract class MessageClient {
protected Connection connection;
protected Session session;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected Random random;
protected int timeToSleep;
// set up the connection and session
public MessageClient(ActiveMQConnectionFactory factory, int timeToSleep) throws Exception {
this.connection = factory.createConnection();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.timeToSleep = timeToSleep;
this.random = new Random(System.currentTimeMillis());
preInit();
initProducer();
initConsumer();
this.connection.start();
}
protected void preInit() throws JMSException {
}
abstract protected void initProducer() throws JMSException;
abstract protected void initConsumer() throws JMSException;
}
class MessageSender extends MessageClient implements Runnable {
protected Destination tempDest;
public MessageSender(ActiveMQConnectionFactory factory) throws Exception {
super(factory, RANDOM_SLEEP_FOR_SENDER_MS);
}
@Override
public void run() {
// create a message
try {
TextMessage message = session.createTextMessage("request: message #" + messageCount.getAndIncrement());
message.setJMSReplyTo(tempDest);
producer.send(message);
LOG.info("SENDER: Message [" + message.getText() + "] has been sent.");
Message incomingMessage = consumer.receive(timeToSleep);
if (incomingMessage instanceof TextMessage) {
try {
LOG.info("SENDER: Got a response from echo service!" + ((TextMessage) incomingMessage).getText());
responseReceived.incrementAndGet();
} catch (JMSException e) {
LOG.error("SENDER: might want to see why i'm getting non-text messages..." + incomingMessage, e);
}
} else {
LOG.info("SENDER: Did not get a response this time");
}
} catch (JMSException e) {
LOG.error("SENDER: Could not complete message sending properly: " + e.getMessage());
} finally {
try {
producer.close();
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Override
protected void preInit() throws JMSException {
this.tempDest = session.createTemporaryTopic();
}
@Override
protected void initProducer() throws JMSException {
this.producer = session.createProducer(new ActiveMQQueue(QUEUE_NAME));
}
@Override
protected void initConsumer() throws JMSException {
this.consumer = session.createConsumer(tempDest);
LOG.info("consumer for: " + tempDest + ", " + consumer);
}
}
class EchoRespondent extends MessageClient implements Runnable {
public EchoRespondent(ActiveMQConnectionFactory factory) throws Exception {
super(factory, RANDOM_SLEEP_FOR_RESPONDENT_MS);
}
@Override
public void run() {
try {
LOG.info("RESPONDENT LISTENING");
while (!shutdown.get()) {
Message incomingMessage = consumer.receive(1000);
if (incomingMessage instanceof TextMessage) {
ActiveMQTextMessage textMessage = (ActiveMQTextMessage) incomingMessage;
try {
LOG.info("RESPONDENT: Received a message: [" + textMessage.getText() + "]" + Arrays.asList(textMessage.getBrokerPath()));
Message message = session.createTextMessage("reply: " + textMessage.getText());
Destination replyTo = incomingMessage.getJMSReplyTo();
TimeUnit.MILLISECONDS.sleep(timeToSleep);
consumerDemandExists.await(5, TimeUnit.SECONDS);
try {
producer.send(replyTo, message);
LOG.info("RESPONDENT: sent reply:" + message.getJMSMessageID() + " back to: " + replyTo);
} catch (JMSException e) {
LOG.error("RESPONDENT: could not send reply message: " + e.getLocalizedMessage(), e);
respondentSendError.incrementAndGet();
}
} catch (JMSException e) {
LOG.error("RESPONDENT: could not create the reply message: " + e.getLocalizedMessage(), e);
} catch (InterruptedException e) {
LOG.info("RESPONDENT could not generate a random number");
}
}
}
} catch (JMSException e) {
LOG.info("RESPONDENT: Could not set the message listener on the respondent");
}
}
@Override
protected void initProducer() throws JMSException {
this.producer = session.createProducer(null);
// so that we can get an advisory on sending with no consumers
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
@Override
protected void initConsumer() throws JMSException {
this.consumer = session.createConsumer(new ActiveMQQueue(QUEUE_NAME));
}
}
}