add method to retrieve the URI used by the local VMTransport

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@517741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-13 15:53:24 +00:00
parent 8815bf8045
commit 4741136695
8 changed files with 62 additions and 1 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.net.URI;
import java.util.Set; import java.util.Set;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
@ -239,6 +240,14 @@ public interface Broker extends Region, Service {
* @param adminConnectionContext * @param adminConnectionContext
*/ */
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext); public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
/**
* @return the temp data store
*/
public Store getTempDataStore(); public Store getTempDataStore();
/**
* @return the URI that can be used to connect to the local Broker
*/
public URI getVmConnectorURI();
} }

View File

@ -38,6 +38,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -238,5 +239,8 @@ public class BrokerFilter implements Broker {
return next.getTempDataStore(); return next.getTempDataStore();
} }
public URI getVmConnectorURI(){
return next.getVmConnectorURI();
}
} }

View File

@ -38,6 +38,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -236,5 +237,9 @@ public class EmptyBroker implements Broker {
public Store getTempDataStore() { public Store getTempDataStore() {
return null; return null;
} }
public URI getVmConnectorURI(){
return null;
}
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -235,5 +236,9 @@ public class ErrorBroker implements Broker {
public Store getTempDataStore() { public Store getTempDataStore() {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public URI getVmConnectorURI(){
throw new BrokerStoppedException(this.message);
}
} }

View File

@ -38,6 +38,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -249,5 +250,9 @@ public class MutableBrokerFilter implements Broker {
public Store getTempDataStore() { public Store getTempDataStore() {
return getNext().getTempDataStore(); return getNext().getTempDataStore();
} }
public URI getVmConnectorURI(){
return getNext().getVmConnectorURI();
}
} }

View File

@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -66,6 +67,8 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ConsumerState;
@ -77,6 +80,8 @@ import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -129,6 +134,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private ConnectionContext context; private ConnectionContext context;
private boolean networkConnection; private boolean networkConnection;
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge = null;
static class ConnectionState extends org.apache.activemq.state.ConnectionState{ static class ConnectionState extends org.apache.activemq.state.ConnectionState{
@ -464,6 +470,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(seq>producerState.getLastSequenceId()){ if(seq>producerState.getLastSequenceId()){
producerState.setLastSequenceId(seq); producerState.setLastSequenceId(seq);
broker.send(producerExchange,messageSend); broker.send(producerExchange,messageSend);
}else {
if (log.isDebugEnabled()) {
log.debug("Discarding duplicate: " + messageSend);
}
} }
}else{ }else{
// producer not local to this broker // producer not local to this broker
@ -1063,6 +1073,19 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
masterBroker=new MasterBroker(parent,transport); masterBroker=new MasterBroker(parent,transport);
masterBroker.startProcessing(); masterBroker.startProcessing();
log.info("Slave Broker "+info.getBrokerName()+" is attached"); log.info("Slave Broker "+info.getBrokerName()+" is attached");
}else if (info.isNetworkConnection() && info.isDuplexConnection()) {
//so this TransportConnection is the rear end of a network bridge
//We have been requested to create a two way pipe ...
try{
Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties());
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config,props,null);
config.setLocalBrokerName(broker.getBrokerName());
}catch(IOException e){
log.error("Creating duplex network bridge",e);
}
} }
// We only expect to get one broker info command per connection // We only expect to get one broker info command per connection
if(this.brokerInfo!=null){ if(this.brokerInfo!=null){

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -608,4 +609,8 @@ public class RegionBroker implements Broker {
public Store getTempDataStore() { public Store getTempDataStore() {
return brokerService.getTempDataStore(); return brokerService.getTempDataStore();
} }
public URI getVmConnectorURI(){
return brokerService.getVmConnectorURI();
}
} }

View File

@ -39,6 +39,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import java.net.URI;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -231,4 +232,8 @@ public class StubBroker implements Broker {
public Store getTempDataStore() { public Store getTempDataStore() {
return null; return null;
} }
public URI getVmConnectorURI(){
return null;
}
} }