Fixes for networks and the invalid Brokers caper

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@373863 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-01-31 16:35:13 +00:00
parent 3e7ebc2368
commit 5a429d90bb
11 changed files with 426 additions and 390 deletions

View File

@ -47,6 +47,7 @@ import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
@ -190,7 +191,7 @@ public class BrokerService implements Service {
* @throws Exception
*/
public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception{
NetworkConnector connector=new NetworkConnector();
NetworkConnector connector=new NetworkConnector(this);
// add the broker name to the parameters if not set
connector.setUri(discoveryAddress);
return addNetworkConnector(connector);
@ -219,7 +220,6 @@ public class BrokerService implements Service {
map.put("network", "true");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
networkConnectors.add(connector);
if (isUseJmx()) {
registerNetworkConnectorMBean(connector);
@ -356,6 +356,8 @@ public class BrokerService implements Service {
}
log.info("ActiveMQ Message Broker (" + getBrokerName() + ") is shutting down");
BrokerRegistry.getInstance().unbind(getBrokerName());
//remove any VMTransports connected
VMTransportFactory.stopped(getBrokerName());
removeShutdownHook();

View File

@ -147,7 +147,7 @@ public class RegionBroker implements Broker {
}
synchronized (clientIdSet ) {
if (clientIdSet.containsKey(clientId)) {
throw new InvalidClientIDException("Client: " + clientId + " already connected");
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected");
}
else {
clientIdSet.put(clientId, info);

View File

@ -64,6 +64,10 @@ abstract public class BaseCommand implements Command {
public boolean isMessageDispatchNotification(){
return false;
}
public boolean isShutdownInfo(){
return false;
}
/**
* @openwire:property version=1

View File

@ -43,6 +43,7 @@ public interface Command extends DataStructure {
boolean isMessage();
boolean isMessageAck();
boolean isMessageDispatchNotification();
boolean isShutdownInfo();
Response visit( CommandVisitor visitor) throws Throwable;
}

View File

@ -79,5 +79,9 @@ public class KeepAliveInfo implements Command {
public boolean isMessageDispatchNotification(){
return false;
}
public boolean isShutdownInfo(){
return false;
}
}

View File

@ -34,6 +34,10 @@ public class ShutdownInfo extends BaseCommand {
public Response visit(CommandVisitor visitor) throws Throwable {
return visitor.processShutdown( this );
}
public boolean isShutdownInfo(){
return true;
}
}

View File

@ -172,5 +172,9 @@ public class WireFormatInfo implements Command {
public boolean isMessageDispatchNotification(){
return false;
}
public boolean isShutdownInfo(){
return false;
}
}

View File

@ -1,26 +1,22 @@
/**
*
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.network;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -53,393 +49,415 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Forwards messages from the local broker to the remote broker based on
* demand.
* Forwards messages from the local broker to the remote broker based on demand.
*
* @org.xbean.XBean
*
* @version $Revision$
*/
public class DemandForwardingBridge implements Bridge {
static final private Log log = LogFactory.getLog(DemandForwardingBridge.class);
public class DemandForwardingBridge implements Bridge{
static final private Log log=LogFactory.getLog(DemandForwardingBridge.class);
private final Transport localBroker;
private final Transport remoteBroker;
IdGenerator idGenerator = new IdGenerator();
LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
ConnectionInfo connectionInfo;
SessionInfo sessionInfo;
ProducerInfo producerInfo;
private String clientId;
private IdGenerator idGenerator=new IdGenerator();
private LongSequenceGenerator consumerIdGenerator=new LongSequenceGenerator();
private ConnectionInfo localConnectionInfo;
private ConnectionInfo remoteConnectionInfo;
private SessionInfo localSessionInfo;
private ProducerInfo producerInfo;
private String localBrokerName;
private String remoteBrokerName;
private String localClientId;
private int prefetchSize=1000;
private boolean dispatchAsync;
private String destinationFilter = ">";
private String destinationFilter=">";
private ConsumerInfo demandConsumerInfo;
private int demandConsumerDispatched;
private AtomicBoolean localBridgeStarted=new AtomicBoolean(false);
private AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false);
private boolean disposed=false;
BrokerId localBrokerId;
BrokerId remoteBrokerId;
private static class DemandSubscription {
private static class DemandSubscription{
ConsumerInfo remoteInfo;
ConsumerInfo localInfo;
int dispatched;
public DemandSubscription(ConsumerInfo info) {
remoteInfo = info;
localInfo = info.copy();
public DemandSubscription(ConsumerInfo info){
remoteInfo=info;
localInfo=info.copy();
}
}
ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
protected final BrokerId localBrokerPath[] = new BrokerId[] {null};
protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
public DemandForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
this.remoteBroker = remoteBroker;
ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
this.localBroker=localBroker;
this.remoteBroker=remoteBroker;
}
public void start() throws Exception {
log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
public void start() throws Exception{
log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
localBroker.setTransportListener(new TransportListener(){
public void onCommand(Command command) {
public void onCommand(Command command){
serviceLocalCommand(command);
}
public void onException(IOException error) {
public void onException(IOException error){
serviceLocalException(error);
}
});
remoteBroker.setTransportListener(new TransportListener(){
public void onCommand(Command command) {
public void onCommand(Command command){
serviceRemoteCommand(command);
}
public void onException(IOException error) {
public void onException(IOException error){
serviceRemoteException(error);
}
});
localBroker.start();
remoteBroker.start();
triggerRemoteStartBridge();
}
protected void triggerStartBridge() throws IOException {
Thread thead = new Thread() {
public void run() {
try {
startBridge();
}
catch (IOException e) {
log.error("Failed to start network bridge: " + e, e);
protected void triggerLocalStartBridge() throws IOException{
Thread thead=new Thread(){
public void run(){
try{
startLocalBridge();
}catch(IOException e){
log.error("Failed to start network bridge: "+e,e);
}
}
};
thead.start();
}
protected void startBridge() throws IOException {
BrokerInfo brokerInfo = new BrokerInfo();
remoteBroker.oneway(brokerInfo);
connectionInfo = new ConnectionInfo();
connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
connectionInfo.setClientId(clientId);
localBroker.oneway(connectionInfo);
remoteBroker.oneway(connectionInfo);
sessionInfo=new SessionInfo(connectionInfo, 1);
localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo);
producerInfo = new ProducerInfo(sessionInfo, 1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo = new ConsumerInfo(sessionInfo, 1);
demandConsumerInfo.setDispatchAsync(dispatchAsync);
demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
log.info("Network connection between " + localBroker + " and " + remoteBroker + " has been established.");
}
public void stop() throws Exception{
try {
if( connectionInfo!=null ) {
localBroker.request(connectionInfo.createRemoveCommand());
remoteBroker.request(connectionInfo.createRemoveCommand());
protected void triggerRemoteStartBridge() throws IOException{
Thread thead=new Thread(){
public void run(){
try{
startRemoteBridge();
}catch(IOException e){
log.error("Failed to start network bridge: "+e,e);
}
}
localBroker.setTransportListener(null);
remoteBroker.setTransportListener(null);
remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo());
}catch(IOException e){
log.debug("Caught exception stopping",e);
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.throwFirstException();
};
thead.start();
}
protected void startLocalBridge() throws IOException{
if(localBridgeStarted.compareAndSet(false,true)){
localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound";
localConnectionInfo.setClientId(localClientId);
localBroker.oneway(localConnectionInfo);
localSessionInfo=new SessionInfo(localConnectionInfo,1);
localBroker.oneway(localSessionInfo);
log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+") has been established.");
}
}
protected void serviceRemoteException(IOException error) {
log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error);
protected void startRemoteBridge() throws IOException{
if(remoteBridgeStarted.compareAndSet(false,true)){
BrokerInfo brokerInfo=new BrokerInfo();
brokerInfo.setBrokerName(localBrokerName);
remoteBroker.oneway(brokerInfo);
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound");
remoteBroker.oneway(remoteConnectionInfo);
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
remoteBroker.oneway(remoteSessionInfo);
producerInfo=new ProducerInfo(remoteSessionInfo,1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
demandConsumerInfo.setDispatchAsync(dispatchAsync);
demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
}
}
public void stop() throws Exception{
if(!disposed){
try{
disposed=true;
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
if(localConnectionInfo!=null){
localBroker.request(localConnectionInfo.createRemoveCommand());
remoteBroker.request(remoteConnectionInfo.createRemoveCommand());
}
localBroker.setTransportListener(null);
remoteBroker.setTransportListener(null);
remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo());
}catch(IOException e){
log.debug("Caught exception stopping",e);
}finally{
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.throwFirstException();
}
}
}
protected void serviceRemoteException(IOException error){
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this);
}
protected void serviceRemoteCommand(Command command) {
try {
if( command.isMessageDispatch() ) {
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
demandConsumerDispatched++;
if( demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize()*.75) ) {
remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
demandConsumerDispatched=0;
}
} else if ( command.isBrokerInfo() ) {
synchronized( this ) {
remoteBrokerId = ((BrokerInfo)command).getBrokerId();
remoteBrokerPath[0] = remoteBrokerId;
if( localBrokerId !=null) {
if( localBrokerId.equals(remoteBrokerId) ) {
log.info("Disconnecting loop back connection.");
ServiceSupport.dispose(this);
} else {
triggerStartBridge();
protected void serviceRemoteCommand(Command command){
if(!disposed){
try{
if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
demandConsumerDispatched++;
if(demandConsumerDispatched>(demandConsumerInfo.getPrefetchSize()*.75)){
remoteBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,demandConsumerDispatched));
demandConsumerDispatched=0;
}
}else if(command.isBrokerInfo()){
synchronized(this){
BrokerInfo remoteBrokerInfo=(BrokerInfo) command;
remoteBrokerId=remoteBrokerInfo.getBrokerId();
remoteBrokerPath[0]=remoteBrokerId;
remoteBrokerName=remoteBrokerInfo.getBrokerName();
if(localBrokerId!=null){
if(localBrokerId.equals(remoteBrokerId)){
log.info("Disconnecting loop back connection.");
ServiceSupport.dispose(this);
}else{
triggerLocalStartBridge();
}
}
}
}
} else {
switch ( command.getDataStructureType() ) {
}else{
switch(command.getDataStructureType()){
case WireFormatInfo.DATA_STRUCTURE_TYPE:
break;
break;
default:
log.warn("Unexpected remote command: "+command);
}
}
}catch(IOException e){
serviceRemoteException(e);
}
} catch (IOException e) {
serviceRemoteException(e);
}
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
if( data.getClass() == ConsumerInfo.class ) {
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException{
if(data.getClass()==ConsumerInfo.class){
// Create a new local subscription
ConsumerInfo info = (ConsumerInfo) data;
BrokerId[] path = info.getBrokerPath();
if( (path!=null && path.length>0) || info.isNetworkSubscription() ) {
// Ignore: We only support directly connected brokers for now.
ConsumerInfo info=(ConsumerInfo) data;
BrokerId[] path=info.getBrokerPath();
if((path!=null&&path.length>0)||info.isNetworkSubscription()){
// Ignore: We only support directly connected brokers for now.
return;
}
if( contains(info.getBrokerPath(), localBrokerPath[0]) ) {
if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
return;
}
if( log.isTraceEnabled() )
log.trace("Forwarding sub on " + localBroker + " from " + remoteBroker + " on "+info);
if(log.isTraceEnabled())
log.trace("Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
// Update the packet to show where it came from.
info = info.copy();
info.setBrokerPath( appendToBrokerPath(info.getBrokerPath(), remoteBrokerPath) );
DemandSubscription sub = new DemandSubscription(info);
sub.localInfo.setConsumerId( new ConsumerId(sessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()) );
info=info.copy();
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
DemandSubscription sub=new DemandSubscription(info);
sub.localInfo.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
sub.localInfo.setDispatchAsync(dispatchAsync);
sub.localInfo.setPrefetchSize(prefetchSize);
byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if( priority > Byte.MIN_VALUE && info.getBrokerPath()!=null && info.getBrokerPath().length>1 ) {
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
// The longer the path to the consumer, the less it's consumer priority.
priority -= info.getBrokerPath().length+1;
priority-=info.getBrokerPath().length+1;
}
sub.localInfo.setPriority(priority);
subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(), sub);
subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(), sub);
subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
sub.localInfo.setBrokerPath(info.getBrokerPath());
sub.localInfo.setNetworkSubscription(true);
// This works for now since we use a VM connection to the local broker.
// may need to change if we ever subscribe to a remote broker.
sub.localInfo.setAdditionalPredicate(new BooleanExpression(){
public boolean matches(MessageEvaluationContext message) throws JMSException {
try {
public boolean matches(MessageEvaluationContext message) throws JMSException{
try{
return matchesForwardingFilter(message.getMessage());
} catch (IOException e) {
}catch(IOException e){
throw JMSExceptionSupport.create(e);
}
}
public Object evaluate(MessageEvaluationContext message) throws JMSException {
return matches(message) ? Boolean.TRUE : Boolean.FALSE;
public Object evaluate(MessageEvaluationContext message) throws JMSException{
return matches(message)?Boolean.TRUE:Boolean.FALSE;
}
});
localBroker.oneway(sub.localInfo);
localBroker.oneway(sub.localInfo);
}
if( data.getClass() == RemoveInfo.class ) {
ConsumerId id = (ConsumerId) ((RemoveInfo)data).getObjectId();
DemandSubscription sub = (DemandSubscription)subscriptionMapByRemoteId.remove(id);
if( sub !=null ) {
if(data.getClass()==RemoveInfo.class){
ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id);
if(sub!=null){
subscriptionMapByLocalId.remove(sub.localInfo.getConsumerId());
localBroker.oneway(sub.localInfo.createRemoveCommand());
}
}
}
protected void serviceLocalException(Throwable error) {
log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error);
protected void serviceLocalException(Throwable error){
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this);
}
boolean matchesForwardingFilter(Message message) {
if( message.isRecievedByDFBridge() || contains(message.getBrokerPath(), remoteBrokerPath[0]) )
boolean matchesForwardingFilter(Message message){
if(message.isRecievedByDFBridge()||contains(message.getBrokerPath(),remoteBrokerPath[0]))
return false;
// Don't propagate advisory messages about network subscriptions
if( message.isAdvisory()
&& message.getDataStructure()!=null
&& message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO) {
if(message.isAdvisory()&&message.getDataStructure()!=null
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
if(info.isNetworkSubscription()) {
if(info.isNetworkSubscription()){
return false;
}
}
return true;
}
protected void serviceLocalCommand(Command command) {
final boolean trace = log.isTraceEnabled();
try {
if( command.isMessageDispatch() ) {
MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
DemandSubscription sub = (DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId());
if( sub!=null ) {
message = message.copy();
// Update the packet to show where it came from.
message.setBrokerPath( appendToBrokerPath(message.getBrokerPath(), localBrokerPath) );
message.setProducerId(producerInfo.getProducerId());
message.setDestination( md.getDestination() );
if( message.getOriginalTransactionId()==null )
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
message.setRecievedByDFBridge(true);
message.evictMarshlledForm();
if( trace )
log.trace("bridging " + localBroker + " -> " + remoteBroker + ": "+message);
if (!message.isPersistent() || !sub.remoteInfo.isDurable()){
remoteBroker.oneway( message );
}else{
Response response = remoteBroker.request(message);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
serviceLocalException(er.getException());
protected void serviceLocalCommand(Command command){
if(!disposed){
final boolean trace=log.isTraceEnabled();
try{
if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command;
Message message=md.getMessage();
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
if(sub!=null){
message=message.copy();
// Update the packet to show where it came from.
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
message.setProducerId(producerInfo.getProducerId());
message.setDestination(md.getDestination());
if(message.getOriginalTransactionId()==null)
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
message.setRecievedByDFBridge(true);
message.evictMarshlledForm();
if(trace)
log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
if(!message.isPersistent()||!sub.remoteInfo.isDurable()){
remoteBroker.oneway(message);
}else{
Response response=remoteBroker.request(message);
if(response.isException()){
ExceptionResponse er=(ExceptionResponse) response;
serviceLocalException(er.getException());
}
}
sub.dispatched++;
if(sub.dispatched>(sub.localInfo.getPrefetchSize()*.75)){
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,sub.dispatched));
sub.dispatched=0;
}
}
sub.dispatched++;
if( sub.dispatched > (sub.localInfo.getPrefetchSize()*.75) ) {
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, sub.dispatched));
sub.dispatched=0;
}
}
} else if ( command.isBrokerInfo() ) {
synchronized( this ) {
localBrokerId = ((BrokerInfo)command).getBrokerId();
localBrokerPath[0] = localBrokerId;
if( remoteBrokerId !=null ) {
if( remoteBrokerId.equals(localBrokerId) ) {
log.info("Disconnecting loop back connection.");
ServiceSupport.dispose(this);
} else {
triggerStartBridge();
}else if(command.isBrokerInfo()){
synchronized(this){
localBrokerId=((BrokerInfo) command).getBrokerId();
localBrokerPath[0]=localBrokerId;
if(remoteBrokerId!=null){
if(remoteBrokerId.equals(localBrokerId)){
log.info("Disconnecting loop back connection.");
ServiceSupport.dispose(this);
}
}
}
}else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down");
disposed = true;
stop();
}else{
switch(command.getDataStructureType()){
case WireFormatInfo.DATA_STRUCTURE_TYPE:
break;
default:
log.warn("Unexpected local command: "+command);
}
}
} else {
switch ( command.getDataStructureType() ) {
case WireFormatInfo.DATA_STRUCTURE_TYPE:
break;
default:
log.warn("Unexpected local command: "+command);
}
}catch(Exception e){
serviceLocalException(e);
}
} catch (Exception e) {
serviceLocalException(e);
}
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public int getPrefetchSize() {
public int getPrefetchSize(){
return prefetchSize;
}
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
public void setPrefetchSize(int prefetchSize){
this.prefetchSize=prefetchSize;
}
public boolean isDispatchAsync() {
public boolean isDispatchAsync(){
return dispatchAsync;
}
public void setDispatchAsync(boolean dispatchAsync) {
this.dispatchAsync = dispatchAsync;
public void setDispatchAsync(boolean dispatchAsync){
this.dispatchAsync=dispatchAsync;
}
public String getDestinationFilter() {
public String getDestinationFilter(){
return destinationFilter;
}
public void setDestinationFilter(String destinationFilter) {
this.destinationFilter = destinationFilter;
public void setDestinationFilter(String destinationFilter){
this.destinationFilter=destinationFilter;
}
private boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if( brokerPath!=null ) {
for (int i = 0; i < brokerPath.length; i++) {
if( brokerId.equals(brokerPath[i]) )
/**
* @return Returns the localBrokerName.
*/
public String getLocalBrokerName(){
return localBrokerName;
}
/**
* @param localBrokerName
* The localBrokerName to set.
*/
public void setLocalBrokerName(String localBrokerName){
this.localBrokerName=localBrokerName;
}
private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
if(brokerPath!=null){
for(int i=0;i<brokerPath.length;i++){
if(brokerId.equals(brokerPath[i]))
return true;
}
}
return false;
}
private BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId pathsToAppend[]) {
if( brokerPath == null || brokerPath.length==0 )
private BrokerId[] appendToBrokerPath(BrokerId[] brokerPath,BrokerId pathsToAppend[]){
if(brokerPath==null||brokerPath.length==0)
return pathsToAppend;
BrokerId rc[] = new BrokerId[brokerPath.length+pathsToAppend.length];
BrokerId rc[]=new BrokerId[brokerPath.length+pathsToAppend.length];
System.arraycopy(brokerPath,0,rc,0,brokerPath.length);
System.arraycopy(pathsToAppend,0,rc,brokerPath.length,pathsToAppend.length);
return rc;
}
}

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@ -41,17 +42,19 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class NetworkConnector implements Service, DiscoveryListener {
private static final Log log = LogFactory.getLog(NetworkConnector.class);
private BrokerService brokerService;
private DiscoveryAgent discoveryAgent;
private URI localURI;
private ConcurrentHashMap bridges = new ConcurrentHashMap();
private String brokerName;
boolean failover=true;
public NetworkConnector() {
public NetworkConnector(BrokerService service) {
this.brokerService = service;
}
public NetworkConnector(URI localURI, DiscoveryAgent discoveryAgent) throws IOException {
public NetworkConnector(BrokerService service,URI localURI, DiscoveryAgent discoveryAgent) throws IOException {
this.brokerService = service;
this.localURI = localURI;
setDiscoveryAgent(discoveryAgent);
}
@ -161,7 +164,7 @@ public class NetworkConnector implements Service, DiscoveryListener {
this.discoveryAgent = discoveryAgent;
if (discoveryAgent != null) {
this.discoveryAgent.setDiscoveryListener(this);
this.discoveryAgent.setBrokerName(brokerName);
this.discoveryAgent.setBrokerName(brokerService.getBrokerName());
}
}
@ -180,7 +183,7 @@ public class NetworkConnector implements Service, DiscoveryListener {
// Implementation methods
// -------------------------------------------------------------------------
protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
return new DemandForwardingBridge(localTransport, remoteTransport) {
DemandForwardingBridge result = new DemandForwardingBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(IOException error) {
super.serviceRemoteException(error);
try {
@ -190,14 +193,11 @@ public class NetworkConnector implements Service, DiscoveryListener {
}
}
};
result.setLocalBrokerName(brokerService.getBrokerName());
return result;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
if( discoveryAgent!=null ) {
discoveryAgent.setBrokerName(brokerName);
}
}
public boolean isFailover() {
return failover;

View File

@ -1,18 +1,15 @@
/**
*
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.transport.vm;
@ -21,7 +18,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
@ -36,153 +32,156 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class VMTransportFactory extends TransportFactory {
final public static ConcurrentHashMap brokers = new ConcurrentHashMap();
final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
final public static ConcurrentHashMap servers = new ConcurrentHashMap();
public class VMTransportFactory extends TransportFactory{
private static final Log log = LogFactory.getLog(VMTransportFactory.class);
final public static ConcurrentHashMap brokers=new ConcurrentHashMap();
final public static ConcurrentHashMap connectors=new ConcurrentHashMap();
final public static ConcurrentHashMap servers=new ConcurrentHashMap();
BrokerFactoryHandler brokerFactoryHandler;
public Transport doConnect(URI location) throws Exception {
public Transport doConnect(URI location) throws Exception{
return VMTransportServer.configure(doCompositeConnect(location));
}
public Transport doCompositeConnect(URI location) throws Exception {
public Transport doCompositeConnect(URI location) throws Exception{
URI brokerURI;
String host;
Map options;
CompositeData data = URISupport.parseComposite(location);
if( data.getComponents().length==1 && "broker".equals(data.getComponents()[0].getScheme()) ) {
brokerURI = data.getComponents()[0];
CompositeData brokerData = URISupport.parseComposite(brokerURI);
host = (String)brokerData.getParameters().get("brokerName");
if( host == null )
host = "localhost";
if( brokerData.getPath()!=null )
host = data.getPath();
options = data.getParameters();
location = new URI("vm://"+host);
} else {
CompositeData data=URISupport.parseComposite(location);
if(data.getComponents().length==1&&"broker".equals(data.getComponents()[0].getScheme())){
brokerURI=data.getComponents()[0];
CompositeData brokerData=URISupport.parseComposite(brokerURI);
host=(String) brokerData.getParameters().get("brokerName");
if(host==null)
host="localhost";
if(brokerData.getPath()!=null)
host=data.getPath();
options=data.getParameters();
location=new URI("vm://"+host);
}else{
// If using the less complex vm://localhost?broker.persistent=true form
try {
host = location.getHost();
options = URISupport.parseParamters(location);
String config = (String) options.remove("brokerConfig");
if( config != null ) {
brokerURI = new URI(config);
} else {
Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
brokerURI = new URI("broker://()/"+host+"?"+URISupport.createQueryString(brokerOptions));
try{
host=location.getHost();
options=URISupport.parseParamters(location);
String config=(String) options.remove("brokerConfig");
if(config!=null){
brokerURI=new URI(config);
}else{
Map brokerOptions=IntrospectionSupport.extractProperties(options,"broker.");
brokerURI=new URI("broker://()/"+host+"?"+URISupport.createQueryString(brokerOptions));
}
} catch (URISyntaxException e1) {
}catch(URISyntaxException e1){
throw IOExceptionSupport.create(e1);
}
location = new URI("vm://"+host);
location=new URI("vm://"+host);
}
VMTransportServer server = (VMTransportServer) servers.get(host);
//validate the broker is still active
if( !validateBroker(host) || server == null ) {
BrokerService broker = BrokerRegistry.getInstance().lookup(host);
if (broker == null) {
try {
if( brokerFactoryHandler !=null ) {
broker = brokerFactoryHandler.createBroker(brokerURI);
} else {
broker = BrokerFactory.createBroker(brokerURI);
VMTransportServer server=(VMTransportServer) servers.get(host);
// validate the broker is still active
if(!validateBroker(host)||server==null){
BrokerService broker=BrokerRegistry.getInstance().lookup(host);
if(broker==null){
try{
if(brokerFactoryHandler!=null){
broker=brokerFactoryHandler.createBroker(brokerURI);
}else{
broker=BrokerFactory.createBroker(brokerURI);
}
broker.start();
}
catch (URISyntaxException e) {
}catch(URISyntaxException e){
throw IOExceptionSupport.create(e);
}
brokers.put(host, broker);
brokers.put(host,broker);
}
server = (VMTransportServer) servers.get(host);
if (server == null) {
server = (VMTransportServer) bind(location, true);
TransportConnector connector = new TransportConnector(broker.getBroker(), server);
server=(VMTransportServer) servers.get(host);
if(server==null){
server=(VMTransportServer) bind(location,true);
TransportConnector connector=new TransportConnector(broker.getBroker(),server);
connector.start();
connectors.put(host, connector);
connectors.put(host,connector);
}
}else {
}else{}
VMTransport vmtransport=server.connect();
IntrospectionSupport.setProperties(vmtransport,options);
Transport transport=vmtransport;
if(vmtransport.isMarshal()){
HashMap optionsCopy=new HashMap(options);
transport=new MarshallingTransportFilter(transport,createWireFormat(options),createWireFormat(optionsCopy));
}
VMTransport vmtransport = server.connect();
IntrospectionSupport.setProperties(vmtransport, options);
Transport transport = vmtransport;
if (vmtransport.isMarshal()) {
HashMap optionsCopy = new HashMap(options);
transport = new MarshallingTransportFilter(transport, createWireFormat(options), createWireFormat(optionsCopy));
}
if( !options.isEmpty() ) {
if(!options.isEmpty()){
throw new IllegalArgumentException("Invalid connect parameters: "+options);
}
return transport;
}
public TransportServer doBind(String brokerId,URI location) throws IOException {
return bind(location, false);
public TransportServer doBind(String brokerId,URI location) throws IOException{
return bind(location,false);
}
/**
* @param location
* @return
* @return the TransportServer
* @throws IOException
*/
private TransportServer bind(URI location, boolean dispose) throws IOException {
String host = location.getHost();
VMTransportServer server = new VMTransportServer(location, dispose);
Object currentBoundValue = servers.get(host);
if (currentBoundValue != null) {
throw new IOException("VMTransportServer already bound at: " + location);
private TransportServer bind(URI location,boolean dispose) throws IOException{
String host=location.getHost();
log.info("binding to broker: " + host);
VMTransportServer server=new VMTransportServer(location,dispose);
Object currentBoundValue=servers.get(host);
if(currentBoundValue!=null){
throw new IOException("VMTransportServer already bound at: "+location);
}
servers.put(host, server);
servers.put(host,server);
return server;
}
public static void stopped(VMTransportServer server) {
String host = server.getBindURI().getHost();
public static void stopped(VMTransportServer server){
String host=server.getBindURI().getHost();
log.info("Shutting down VM connectors for broker: " +host);
servers.remove(host);
TransportConnector connector = (TransportConnector) connectors.remove(host);
if (connector != null) {
TransportConnector connector=(TransportConnector) connectors.remove(host);
if(connector!=null){
ServiceSupport.dispose(connector);
BrokerService broker = (BrokerService) brokers.remove(host);
if (broker != null) {
BrokerService broker=(BrokerService) brokers.remove(host);
if(broker!=null){
ServiceSupport.dispose(broker);
}
}
}
public BrokerFactoryHandler getBrokerFactoryHandler() {
public static void stopped(String host){
log.info("Shutting down VM connectors for broker: " +host);
servers.remove(host);
TransportConnector connector=(TransportConnector) connectors.remove(host);
if(connector!=null){
ServiceSupport.dispose(connector);
BrokerService broker=(BrokerService) brokers.remove(host);
if(broker!=null){
ServiceSupport.dispose(broker);
}
}
}
public BrokerFactoryHandler getBrokerFactoryHandler(){
return brokerFactoryHandler;
}
public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
this.brokerFactoryHandler = brokerFactoryHandler;
public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler){
this.brokerFactoryHandler=brokerFactoryHandler;
}
private boolean validateBroker(String host){
boolean result=true;
if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){
//check the broker is still in the BrokerRegistry
// check the broker is still in the BrokerRegistry
TransportConnector connector=(TransportConnector) connectors.get(host);
if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){
result=false;
//clean-up
// clean-up
brokers.remove(host);
servers.remove(host);
if(connector!=null){

View File

@ -127,7 +127,7 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
protected void setUp() throws Exception {
super.setUp();
bridge = new DemandForwardingBridge(createTransport(), createRemoteTransport());
bridge.setClientId("local-remote-bridge");
bridge.setLocalBrokerName("local");
bridge.setDispatchAsync(false);
bridge.start();