Add more options for networks:

included, excluded destinationm filters
durable destinations as well as dynamic
conduit subscriptions (multiple subs on smae matching destination are treated as one)
networkTTL = number of hops messages/subs can pass through - default = 1

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378700 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-02-18 07:20:16 +00:00
parent f915da5772
commit 068c64639f
6 changed files with 814 additions and 150 deletions

View File

@ -0,0 +1,94 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Consolidates subscriptions
*
* @version $Revision: 1.1 $
*/
public class ConduitBridge extends DemandForwardingBridge{
static final private Log log=LogFactory.getLog(ConduitBridge.class);
/**
* Constructor
* @param localBroker
* @param remoteBroker
*/
public ConduitBridge(Transport localBroker,Transport remoteBroker){
super(localBroker,remoteBroker);
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info){
//search through existing subscriptions and see if we have a match
boolean matched = false;
DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination());
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
DemandSubscription ds = (DemandSubscription)i.next();
if (filter.matches(ds.getLocalInfo().getDestination())){
//add the interest in the subscription
ds.add(ds.getRemoteInfo().getConsumerId());
matched = true;
//continue - we want interest to any existing DemandSubscriptions
}
}
if (matched){
return null; //don't want this subscription added
}
//not matched so create a new one
//but first, if it's durable - changed set the
//ConsumerId here - so it won't be removed if the
//durable subscriber goes away on the other end
if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())){
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
}
return super.createDemandSubscription(info);
}
protected void removeDemandSubscription(ConsumerId id) throws IOException{
List tmpList = new ArrayList();
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
DemandSubscription ds = (DemandSubscription)i.next();
ds.remove(id);
if (ds.isEmpty()){
tmpList.add(ds);
}
}
for (Iterator i = tmpList.iterator(); i.hasNext();){
DemandSubscription ds = (DemandSubscription) i.next();
subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
removeSubscription(ds);
if(log.isTraceEnabled())
log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" : "+ds.getRemoteInfo());
}
}
}

View File

@ -16,6 +16,7 @@ package org.apache.activemq.network;
import java.io.IOException; import java.io.IOException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -37,9 +38,10 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
@ -59,46 +61,41 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
*/ */
public class DemandForwardingBridge implements Bridge{ public class DemandForwardingBridge implements Bridge{
static final private Log log=LogFactory.getLog(DemandForwardingBridge.class); static final private Log log=LogFactory.getLog(DemandForwardingBridge.class);
private final Transport localBroker; protected final Transport localBroker;
private final Transport remoteBroker; protected final Transport remoteBroker;
private IdGenerator idGenerator=new IdGenerator(); protected IdGenerator idGenerator=new IdGenerator();
private LongSequenceGenerator consumerIdGenerator=new LongSequenceGenerator(); protected LongSequenceGenerator consumerIdGenerator=new LongSequenceGenerator();
private ConnectionInfo localConnectionInfo; protected ConnectionInfo localConnectionInfo;
private ConnectionInfo remoteConnectionInfo; protected ConnectionInfo remoteConnectionInfo;
private SessionInfo localSessionInfo; protected SessionInfo localSessionInfo;
private ProducerInfo producerInfo; protected ProducerInfo producerInfo;
private String localBrokerName; protected String localBrokerName;
private String remoteBrokerName; protected String remoteBrokerName;
private String localClientId; protected String localClientId;
private int prefetchSize=1000; protected int prefetchSize=1000;
private boolean dispatchAsync; protected boolean dispatchAsync;
private String destinationFilter=">"; protected String destinationFilter=">";
private ConsumerInfo demandConsumerInfo; protected ConsumerInfo demandConsumerInfo;
private int demandConsumerDispatched; protected int demandConsumerDispatched;
private AtomicBoolean localBridgeStarted=new AtomicBoolean(false); protected AtomicBoolean localBridgeStarted=new AtomicBoolean(false);
private AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false); protected AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false);
private boolean disposed=false; protected boolean disposed=false;
BrokerId localBrokerId; protected BrokerId localBrokerId;
BrokerId remoteBrokerId; protected BrokerId remoteBrokerId;
private Object brokerInfoMutex = new Object(); protected ActiveMQDestination[] excludedDestinations;
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
private static class DemandSubscription{ protected ActiveMQDestination[] staticallyIncludedDestinations;
ConsumerInfo remoteInfo; protected ActiveMQDestination[] durableDestinations;
ConsumerInfo localInfo; protected ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
int dispatched; protected ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
public DemandSubscription(ConsumerInfo info){
remoteInfo=info;
localInfo=info.copy();
}
}
ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
protected final BrokerId localBrokerPath[]=new BrokerId[] { null }; protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null }; protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
private CountDownLatch startedLatch = new CountDownLatch(2); protected CountDownLatch startedLatch = new CountDownLatch(2);
private boolean decreaseNetowrkConsumerPriority; protected Object brokerInfoMutex = new Object();
protected boolean decreaseNetworkConsumerPriority;
protected int networkTTL = 1;
public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){ public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
this.localBroker=localBroker; this.localBroker=localBroker;
@ -107,7 +104,7 @@ public class DemandForwardingBridge implements Bridge{
public void start() throws Exception{ public void start() throws Exception{
log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established."); log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
localBroker.setTransportListener(new TransportListener(){ localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){ public void onCommand(Command command){
serviceLocalCommand(command); serviceLocalCommand(command);
} }
@ -116,7 +113,7 @@ public class DemandForwardingBridge implements Bridge{
serviceLocalException(error); serviceLocalException(error);
} }
}); });
remoteBroker.setTransportListener(new TransportListener(){ remoteBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){ public void onCommand(Command command){
serviceRemoteCommand(command); serviceRemoteCommand(command);
} }
@ -168,6 +165,7 @@ public class DemandForwardingBridge implements Bridge{
log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+") has been established."); +") has been established.");
startedLatch.countDown(); startedLatch.countDown();
setupStaticDestinations();
} }
} }
@ -196,6 +194,12 @@ public class DemandForwardingBridge implements Bridge{
} }
} }
/**
* stop the bridge
* @throws Exception
*/
public void stop() throws Exception{ public void stop() throws Exception{
if(!disposed){ if(!disposed){
try{ try{
@ -274,62 +278,39 @@ public class DemandForwardingBridge implements Bridge{
// Create a new local subscription // Create a new local subscription
ConsumerInfo info=(ConsumerInfo) data; ConsumerInfo info=(ConsumerInfo) data;
BrokerId[] path=info.getBrokerPath(); BrokerId[] path=info.getBrokerPath();
if((path!=null&&path.length>0)||info.isNetworkSubscription()){ if((path!=null&&path.length>= networkTTL)){
// Ignore: We only support directly connected brokers for now. if(log.isTraceEnabled())
log.trace("Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
return; return;
} }
if(contains(info.getBrokerPath(),localBrokerPath[0])){ if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker. // Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isTraceEnabled())
log.trace("Ignoring sub " + info + " already routed through this broker once");
return; return;
} }
if (!isPermissableDestination(info.getDestination())){
//ignore if not in the permited or in the excluded list
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace("Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info); log.trace("Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
return;
}
// Update the packet to show where it came from. // Update the packet to show where it came from.
info=info.copy(); info=info.copy();
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath)); info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
DemandSubscription sub=new DemandSubscription(info); DemandSubscription sub=createDemandSubscription(info);
sub.localInfo.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator if (sub != null){
.getNextSequenceId())); addSubscription(sub);
sub.localInfo.setDispatchAsync(dispatchAsync); if(log.isTraceEnabled())
sub.localInfo.setPrefetchSize(prefetchSize); log.trace("Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
}else {
if( decreaseNetowrkConsumerPriority ) { if(log.isTraceEnabled())
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY; log.trace("Ignoring sub " + info + " already subscribed to matching destination");
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
// The longer the path to the consumer, the less it's consumer priority.
priority-=info.getBrokerPath().length+1;
} }
sub.localInfo.setPriority(priority);
}
subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
sub.localInfo.setBrokerPath(info.getBrokerPath());
sub.localInfo.setNetworkSubscription(true);
// This works for now since we use a VM connection to the local broker.
// may need to change if we ever subscribe to a remote broker.
sub.localInfo.setAdditionalPredicate(new BooleanExpression(){
public boolean matches(MessageEvaluationContext message) throws JMSException{
try{
return matchesForwardingFilter(message.getMessage());
}catch(IOException e){
throw JMSExceptionSupport.create(e);
}
}
public Object evaluate(MessageEvaluationContext message) throws JMSException{
return matches(message)?Boolean.TRUE:Boolean.FALSE;
}
});
localBroker.oneway(sub.localInfo);
} }
if(data.getClass()==RemoveInfo.class){ if(data.getClass()==RemoveInfo.class){
ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId(); ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id); removeDemandSubscription(id);
if(sub!=null){
subscriptionMapByLocalId.remove(sub.localInfo.getConsumerId());
localBroker.oneway(sub.localInfo.createRemoveCommand());
}
} }
} }
@ -338,31 +319,25 @@ public class DemandForwardingBridge implements Bridge{
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
} }
boolean matchesForwardingFilter(Message message){ protected void addSubscription(DemandSubscription sub) throws IOException{
if(message.isRecievedByDFBridge()||contains(message.getBrokerPath(),remoteBrokerPath[0])) if (sub != null){
return false; localBroker.oneway(sub.getLocalInfo());
// Don't propagate advisory messages about network subscriptions
if(message.isAdvisory()&&message.getDataStructure()!=null
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
if(info.isNetworkSubscription()){
return false;
} }
} }
return true;
}
protected void serviceLocalCommand(Command command){ protected void removeSubscription(DemandSubscription sub) throws IOException{
if(!disposed){
final boolean trace=log.isTraceEnabled();
try{
if(command.isMessageDispatch()){
waitStarted();
MessageDispatch md=(MessageDispatch) command;
Message message=md.getMessage();
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
if(sub!=null){ if(sub!=null){
message=message.copy(); subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
}
}
protected DemandSubscription getDemandSubscription(MessageDispatch md){
return (DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
}
protected Message configureMessage(MessageDispatch md){
Message message=md.getMessage().copy();
// Update the packet to show where it came from. // Update the packet to show where it came from.
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath)); message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
message.setProducerId(producerInfo.getProducerId()); message.setProducerId(producerInfo.getProducerId());
@ -372,9 +347,24 @@ public class DemandForwardingBridge implements Bridge{
message.setTransactionId(null); message.setTransactionId(null);
message.setRecievedByDFBridge(true); message.setRecievedByDFBridge(true);
message.evictMarshlledForm(); message.evictMarshlledForm();
return message;
}
protected void serviceLocalCommand(Command command){
if(!disposed){
final boolean trace=log.isTraceEnabled();
try{
if(command.isMessageDispatch()){
waitStarted();
MessageDispatch md=(MessageDispatch) command;
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
if(sub!=null){
Message message= configureMessage(md);
if(trace) if(trace)
log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message); log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
if(!message.isPersistent()||!sub.remoteInfo.isDurable()){ if(!message.isPersistent()||!sub.getRemoteInfo().isDurable()){
remoteBroker.oneway(message); remoteBroker.oneway(message);
}else{ }else{
Response response=remoteBroker.request(message); Response response=remoteBroker.request(message);
@ -383,10 +373,10 @@ public class DemandForwardingBridge implements Bridge{
serviceLocalException(er.getException()); serviceLocalException(er.getException());
} }
} }
sub.dispatched++; int dispatched = sub.incrementDispatched();
if(sub.dispatched>(sub.localInfo.getPrefetchSize()*.75)){ if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,sub.dispatched)); localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
sub.dispatched=0; sub.setDispatched(0);
} }
} }
}else if(command.isBrokerInfo()){ }else if(command.isBrokerInfo()){
@ -419,28 +409,91 @@ public class DemandForwardingBridge implements Bridge{
} }
} }
/**
* @return prefetch size
*/
public int getPrefetchSize(){ public int getPrefetchSize(){
return prefetchSize; return prefetchSize;
} }
/**
* @param prefetchSize
*/
public void setPrefetchSize(int prefetchSize){ public void setPrefetchSize(int prefetchSize){
this.prefetchSize=prefetchSize; this.prefetchSize=prefetchSize;
} }
/**
* @return true if dispatch async
*/
public boolean isDispatchAsync(){ public boolean isDispatchAsync(){
return dispatchAsync; return dispatchAsync;
} }
/**
* @param dispatchAsync
*/
public void setDispatchAsync(boolean dispatchAsync){ public void setDispatchAsync(boolean dispatchAsync){
this.dispatchAsync=dispatchAsync; this.dispatchAsync=dispatchAsync;
} }
public String getDestinationFilter(){ /**
return destinationFilter; * @return Returns the dynamicallyIncludedDestinations.
*/
public ActiveMQDestination[] getDynamicallyIncludedDestinations(){
return dynamicallyIncludedDestinations;
} }
public void setDestinationFilter(String destinationFilter){ /**
this.destinationFilter=destinationFilter; * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations){
this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
public ActiveMQDestination[] getExcludedDestinations(){
return excludedDestinations;
}
/**
* @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations){
this.excludedDestinations=excludedDestinations;
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
public ActiveMQDestination[] getStaticallyIncludedDestinations(){
return staticallyIncludedDestinations;
}
/**
* @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations){
this.staticallyIncludedDestinations=staticallyIncludedDestinations;
}
/**
* @return Returns the durableDestinations.
*/
public ActiveMQDestination[] getDurableDestinations(){
return durableDestinations;
}
/**
* @param durableDestinations The durableDestinations to set.
*/
public void setDurableDestinations(ActiveMQDestination[] durableDestinations){
this.durableDestinations=durableDestinations;
} }
/** /**
@ -458,6 +511,64 @@ public class DemandForwardingBridge implements Bridge{
this.localBrokerName=localBrokerName; this.localBrokerName=localBrokerName;
} }
/**
* @return Returns the localBroker.
*/
public Transport getLocalBroker(){
return localBroker;
}
/**
* @return Returns the remoteBroker.
*/
public Transport getRemoteBroker(){
return remoteBroker;
}
/**
* @return Returns the remoteBrokerName.
*/
public String getRemoteBrokerName(){
return remoteBrokerName;
}
/**
* @param remoteBrokerName The remoteBrokerName to set.
*/
public void setRemoteBrokerName(String remoteBrokerName){
this.remoteBrokerName=remoteBrokerName;
}
/**
* @return Returns the decreaseNetworkConsumerPriority.
*/
public boolean isDecreaseNetworkConsumerPriority(){
return decreaseNetworkConsumerPriority;
}
/**
* @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set.
*/
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){
this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
}
/**
* @return Returns the networkTTL.
*/
public int getNetworkTTL(){
return networkTTL;
}
/**
* @param networkTTL The networkTTL to set.
*/
public void setNetworkTTL(int networkTTL){
this.networkTTL=networkTTL;
}
private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){ private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
if(brokerPath!=null){ if(brokerPath!=null){
for(int i=0;i<brokerPath.length;i++){ for(int i=0;i<brokerPath.length;i++){
@ -477,15 +588,143 @@ public class DemandForwardingBridge implements Bridge{
return rc; return rc;
} }
private void waitStarted() throws InterruptedException {
protected boolean isPermissableDestination(ActiveMQDestination destination){
DestinationFilter filter=DestinationFilter.parseFilter(destination);
ActiveMQDestination[] dests = excludedDestinations;
if(dests!=null&&dests.length>0){
for(int i=0;i<dests.length;i++){
ActiveMQDestination match=dests[i];
if(match!=null&&filter.matches(match)){
return false;
}
}
}
dests = dynamicallyIncludedDestinations;
if(dests!=null&&dests.length>0){
for(int i=0;i<dests.length;i++){
ActiveMQDestination match=dests[i];
if(match!=null&&filter.matches(match)){
return true;
}
}
return false;
}
return true;
}
/**
* Subscriptions for these desitnations are always created
* @throws IOException
*
*/
protected void setupStaticDestinations() throws IOException{
ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null){
for(int i=0;i<dests.length;i++){
ActiveMQDestination dest=dests[i];
DemandSubscription sub = createDemandSubscription(dest);
addSubscription(sub);
if(log.isTraceEnabled())
log.trace("Forwarding messages for static destination: " + dest);
}
}
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info){
DemandSubscription result=new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
if( decreaseNetworkConsumerPriority ) {
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
// The longer the path to the consumer, the less it's consumer priority.
priority-=info.getBrokerPath().length+1;
}
result.getLocalInfo().setPriority(priority);
}
configureDemandSubscription(result);
return result;
}
protected DemandSubscription createDemandSubscription(ActiveMQDestination destination){
ConsumerInfo info = new ConsumerInfo();
info.setDestination(destination);
//the remote info held by the DemandSubscription holds the original consumerId,
//the local info get's overwritten
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
DemandSubscription result=new DemandSubscription(info);
result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
return result;
}
protected void configureDemandSubscription(DemandSubscription sub){
sub.getLocalInfo().setDispatchAsync(dispatchAsync);
sub.getLocalInfo().setPrefetchSize(prefetchSize);
subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub);
subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub);
// This works for now since we use a VM connection to the local broker.
// may need to change if we ever subscribe to a remote broker.
sub.getLocalInfo().setAdditionalPredicate(new BooleanExpression(){
public boolean matches(MessageEvaluationContext message) throws JMSException{
try{
return matchesForwardingFilter(message.getMessage());
}catch(IOException e){
throw JMSExceptionSupport.create(e);
}
}
public Object evaluate(MessageEvaluationContext message) throws JMSException{
return matches(message)?Boolean.TRUE:Boolean.FALSE;
}
});
}
protected void removeDemandSubscription(ConsumerId id) throws IOException{
DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id);
if (sub != null){
removeSubscription(sub);
if(log.isTraceEnabled())
log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" : "+sub.getRemoteInfo());
}
}
protected boolean matchesForwardingFilter(Message message){
if (contains(message.getBrokerPath(),remoteBrokerPath[0])){
if (log.isTraceEnabled()){
log.trace("Message all ready routed once through this broker - ignoring: " + message);
}
}
int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
if(hops >= networkTTL){
if (log.isTraceEnabled()){
log.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
}
return false;
}
// Don't propagate advisory messages about network subscriptions
if(message.isAdvisory()&&message.getDataStructure()!=null
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
if(info.isNetworkSubscription()){
return false;
}
}
return true;
}
protected void waitStarted() throws InterruptedException {
startedLatch.await(); startedLatch.await();
} }
public boolean isDecreaseNetowrkConsumerPriority() {
return decreaseNetowrkConsumerPriority;
}
public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority) {
this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
}
} }

View File

@ -0,0 +1,122 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* Represents a network bridge interface
*
* @version $Revision: 1.1 $
*/
public class DemandSubscription{
private ConsumerInfo remoteInfo;
private ConsumerInfo localInfo;
private Set remoteSubsIds = new CopyOnWriteArraySet();
private AtomicInteger dispatched = new AtomicInteger(0);
DemandSubscription(ConsumerInfo info){
remoteInfo=info;
localInfo=info.copy();
localInfo.setBrokerPath(info.getBrokerPath());
localInfo.setNetworkSubscription(true);
remoteSubsIds.add(info.getConsumerId());
}
/**
* Increment the consumers associated with this subscription
* @param id
* @return true if added
*/
public boolean add(ConsumerId id){
return remoteSubsIds.add(id);
}
/**
* Increment the consumers associated with this subscription
* @param id
* @return true if added
*/
public boolean remove(ConsumerId id){
return remoteSubsIds.remove(id);
}
/**
* @return true if there are no interested consumers
*/
public boolean isEmpty(){
return remoteSubsIds.isEmpty();
}
/**
* @return Returns the dispatched.
*/
public int getDispatched(){
return dispatched.get();
}
/**
* @param dispatched The dispatched to set.
*/
public void setDispatched(int dispatched){
this.dispatched.set(dispatched);
}
/**
* @return dispatched count after incremented
*/
public int incrementDispatched(){
return dispatched.incrementAndGet();
}
/**
* @return Returns the localInfo.
*/
public ConsumerInfo getLocalInfo(){
return localInfo;
}
/**
* @param localInfo The localInfo to set.
*/
public void setLocalInfo(ConsumerInfo localInfo){
this.localInfo=localInfo;
}
/**
* @return Returns the remoteInfo.
*/
public ConsumerInfo getRemoteInfo(){
return remoteInfo;
}
/**
* @param remoteInfo The remoteInfo to set.
*/
public void setRemoteInfo(ConsumerInfo remoteInfo){
this.remoteInfo=remoteInfo;
}
}

View File

@ -0,0 +1,63 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.io.IOException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Consolidates subscriptions
*
* @version $Revision: 1.1 $
*/
public class DurableConduitBridge extends ConduitBridge{
static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
/**
* Constructor
* @param localBroker
* @param remoteBroker
*/
public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
super(localBroker,remoteBroker);
}
/**
* Subscriptions for these desitnations are always created
* @throws IOException
*
*/
protected void setupStaticDestinations() throws IOException{
super.setupStaticDestinations();
ActiveMQDestination[] dests=durableDestinations;
if(dests!=null){
for(int i=0;i<dests.length;i++){
ActiveMQDestination dest=dests[i];
if(isPermissableDestination(dest)){
DemandSubscription sub=createDemandSubscription(dest);
addSubscription(sub);
if(log.isTraceEnabled())
log.trace("Forwarding messages for durable destination: "+dest);
}
}
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
@ -80,7 +81,7 @@ public class ForwardingBridge implements Bridge {
public void start() throws Exception { public void start() throws Exception {
log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established."); log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
localBroker.setTransportListener(new TransportListener(){ localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command) { public void onCommand(Command command) {
serviceLocalCommand(command); serviceLocalCommand(command);
} }
@ -89,7 +90,7 @@ public class ForwardingBridge implements Bridge {
} }
}); });
remoteBroker.setTransportListener(new TransportListener(){ remoteBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command) { public void onCommand(Command command) {
serviceRemoteCommand(command); serviceRemoteCommand(command);
} }
@ -192,7 +193,7 @@ public class ForwardingBridge implements Bridge {
} }
} }
} else { } else {
System.out.println("Unexpected remote command: "+command); log.warn("Unexpected remote command: "+command);
} }
} catch (IOException e) { } catch (IOException e) {
serviceLocalException(e); serviceLocalException(e);

View File

@ -22,6 +22,8 @@ import java.net.URISyntaxException;
import java.util.Set; import java.util.Set;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
@ -49,7 +51,14 @@ public class NetworkConnector implements Service, DiscoveryListener {
private ConcurrentHashMap bridges = new ConcurrentHashMap(); private ConcurrentHashMap bridges = new ConcurrentHashMap();
private Set durableDestinations; private Set durableDestinations;
private boolean failover=true; private boolean failover=true;
private boolean decreaseNetowrkConsumerPriority; private ActiveMQDestination[] excludedDestinations;
private ActiveMQDestination[] dynamicallyIncludedDestinations;
private ActiveMQDestination[] staticallyIncludedDestinations;
private boolean dynamicOnly = false;
private boolean conduitSubscriptions = true;
private boolean decreaseNetworkConsumerPriority;
private int networkTTL = 1;
public NetworkConnector(){ public NetworkConnector(){
@ -182,24 +191,6 @@ public class NetworkConnector implements Service, DiscoveryListener {
setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
} }
// Implementation methods
// -------------------------------------------------------------------------
protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
DemandForwardingBridge result = new DemandForwardingBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(IOException error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker failed.
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
};
result.setDecreaseNetowrkConsumerPriority(isDecreaseNetowrkConsumerPriority());
result.setLocalBrokerName(brokerName);
return result;
}
public boolean isFailover() { public boolean isFailover() {
@ -243,13 +234,167 @@ public class NetworkConnector implements Service, DiscoveryListener {
} }
public boolean isDecreaseNetowrkConsumerPriority() { /**
return decreaseNetowrkConsumerPriority; * @return Returns the dynamicallyIncludedDestinations.
*/
public ActiveMQDestination[] getDynamicallyIncludedDestinations(){
return dynamicallyIncludedDestinations;
} }
public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority) { /**
this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority; * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations){
this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
}
/**
* @return Returns the dynamicOnly.
*/
public boolean isDynamicOnly(){
return dynamicOnly;
}
/**
* @param dynamicOnly The dynamicOnly to set.
*/
public void setDynamicOnly(boolean dynamicOnly){
this.dynamicOnly=dynamicOnly;
}
/**
* @return Returns the conduitSubscriptions.
*/
public boolean isConduitSubscriptions(){
return conduitSubscriptions;
}
/**
* @param conduitSubscriptions The conduitSubscriptions to set.
*/
public void setConduitSubscriptions(boolean conduitSubscriptions){
this.conduitSubscriptions=conduitSubscriptions;
}
/**
* @return Returns the decreaseNetworkConsumerPriority.
*/
public boolean isDecreaseNetworkConsumerPriority(){
return decreaseNetworkConsumerPriority;
}
/**
* @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set.
*/
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){
this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
}
/**
* @return Returns the networkTTL.
*/
public int getNetworkTTL(){
return networkTTL;
}
/**
* @param networkTTL The networkTTL to set.
*/
public void setNetworkTTL(int networkTTL){
this.networkTTL=networkTTL;
}
/**
* @return Returns the excludedDestinations.
*/
public ActiveMQDestination[] getExcludedDestinations(){
return excludedDestinations;
}
/**
* @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(ActiveMQDestination[] exludedDestinations){
this.excludedDestinations=exludedDestinations;
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
public ActiveMQDestination[] getStaticallyIncludedDestinations(){
return staticallyIncludedDestinations;
}
/**
* @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations){
this.staticallyIncludedDestinations=staticallyIncludedDestinations;
}
// Implementation methods
// -------------------------------------------------------------------------
protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
DemandForwardingBridge result = null;
if (conduitSubscriptions){
if (dynamicOnly){
result = new ConduitBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(IOException error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker failed.
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
};
}else {
result = new DurableConduitBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(IOException error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker failed.
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
};
}
}else {
result = new DemandForwardingBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(IOException error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker failed.
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
};
}
result.setLocalBrokerName(brokerName);
result.setNetworkTTL(getNetworkTTL());
result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
result.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
result.setExcludedDestinations(getExcludedDestinations());
result.setStaticallyIncludedDestinations(getStaticallyIncludedDestinations());
if (durableDestinations != null){
ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
dest = (ActiveMQDestination[]) durableDestinations.toArray(dest);
result.setDurableDestinations(dest);
}
return result;
} }
} }