- The vm:// transport was delivering events to the listener before start() was called.  Also clean it up a little by consolidating
   the use of the prePeerSetQueue and messageQueue field.
 - the tcp:// .stop() method now blocks until the thread that calls out to the listener is shutdown.
 - TransportConnection was not doing a good job synchronizing when multiple concurrent conenctions to the same connection Id was established.
   IllegalStateExceptions were common when a failover connection reconnected.  Now we make sure that only 1 connection with a given connectionId
   is activley operating in the broker.  Also removed 1 un-needed hash lookup by replacing the brokerConnectionStates Map with the 
   connectionState variable.
    
Also added a pause in the JmsTempDestinationTest to avoid intermitent failures.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@560872 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-07-30 05:34:37 +00:00
parent 3a5f48d277
commit ea74731701
8 changed files with 397 additions and 314 deletions

View File

@ -54,7 +54,6 @@ public class ConnectionContext {
private Object longTermStoreContext;
private boolean producerFlowControl=true;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
private boolean networkConnection;
private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
@ -241,15 +240,6 @@ public class ConnectionContext {
}
return true;
}
public int incrementReference() {
return referenceCounter.incrementAndGet();
}
public int decrementReference() {
return referenceCounter.decrementAndGet();
}
public synchronized boolean isNetworkConnection() {
return networkConnection;
}

View File

@ -72,6 +72,7 @@ import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
@ -105,8 +106,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
// Keeps track of the state of the connections.
protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
protected final Map brokerConnectionStates;
// protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
// The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo;
private WireFormatInfo wireFormatInfo;
@ -140,16 +141,18 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge = null;
final private TaskRunnerFactory taskRunnerFactory;
private TransportConnectionState connectionState;
static class ConnectionState extends org.apache.activemq.state.ConnectionState{
static class TransportConnectionState extends org.apache.activemq.state.ConnectionState{
private final ConnectionContext context;
TransportConnection connection;
private ConnectionContext context;
private TransportConnection connection;
private final Object connectMutex = new Object();
private AtomicInteger referenceCounter = new AtomicInteger();
public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){
public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection){
super(info);
this.context=context;
this.connection=connection;
connection=transportConnection;
}
public ConnectionContext getContext(){
@ -159,6 +162,23 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
public TransportConnection getConnection(){
return connection;
}
public void setContext(ConnectionContext context) {
this.context = context;
}
public void setConnection(TransportConnection connection) {
this.connection = connection;
}
public int incrementReference() {
return referenceCounter.incrementAndGet();
}
public int decrementReference() {
return referenceCounter.decrementAndGet();
}
}
/**
@ -307,36 +327,6 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return response;
}
protected ConnectionState lookupConnectionState(ConsumerId id){
ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
if(cs==null)
throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
+id.getParentId().getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(ProducerId id){
ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
if(cs==null)
throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
+id.getParentId().getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(SessionId id){
ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId());
if(cs==null)
throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
+id.getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(ConnectionId connectionId){
ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId);
if(cs==null)
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
return cs;
}
public Response processKeepAlive(KeepAliveInfo info) throws Exception{
return null;
@ -354,7 +344,15 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
public Response processShutdown(ShutdownInfo info) throws Exception{
stop();
new Thread("Async Exception Handler"){
public void run(){
try {
TransportConnection.this.stop();
} catch (Exception e) {
serviceException(e);
}
}
}.start();
return null;
}
@ -363,7 +361,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
@ -387,7 +385,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
@ -413,63 +411,39 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
}
if (cs == null) {
throw new NullPointerException("Context is null");
}
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),true);
return null;
}
synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
}
if (cs == null) {
throw new NullPointerException("Context is null");
}
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),false);
return null;
}
synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
}
if (cs == null) {
throw new NullPointerException("Context is null");
}
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context,info.getTransactionId());
return null;
}
synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
}
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=cs.getContext();
broker.forgetTransaction(context,info.getTransactionId());
return null;
}
synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception{
ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
}
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=cs.getContext();
TransactionId[] preparedTransactions=broker.getPreparedTransactions(context);
return new DataArrayResponse(preparedTransactions);
}
@ -497,7 +471,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
synchronized public Response processAddDestination(DestinationInfo info) throws Exception{
ConnectionState cs=lookupConnectionState(info.getConnectionId());
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
broker.addDestinationInfo(cs.getContext(),info);
if(info.getDestination().isTemporary()){
cs.addTempDestination(info);
@ -506,7 +480,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception{
ConnectionState cs=lookupConnectionState(info.getConnectionId());
TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
broker.removeDestinationInfo(cs.getContext(),info);
if(info.getDestination().isTemporary()){
cs.removeTempDestination(info.getDestination());
@ -517,7 +491,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processAddProducer(ProducerInfo info) throws Exception{
SessionId sessionId=info.getProducerId().getParentId();
ConnectionId connectionId=sessionId.getParentId();
ConnectionState cs=lookupConnectionState(connectionId);
TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
@ -537,7 +511,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processRemoveProducer(ProducerId id) throws Exception{
SessionId sessionId=id.getParentId();
ConnectionId connectionId=sessionId.getParentId();
ConnectionState cs=lookupConnectionState(connectionId);
TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
@ -553,7 +527,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception{
SessionId sessionId=info.getConsumerId().getParentId();
ConnectionId connectionId=sessionId.getParentId();
ConnectionState cs=lookupConnectionState(connectionId);
TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "
@ -573,7 +547,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception{
SessionId sessionId=id.getParentId();
ConnectionId connectionId=sessionId.getParentId();
ConnectionState cs=lookupConnectionState(connectionId);
TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
@ -588,7 +562,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processAddSession(SessionInfo info) throws Exception{
ConnectionId connectionId=info.getSessionId().getParentId();
ConnectionState cs=lookupConnectionState(connectionId);
TransportConnectionState cs=lookupConnectionState(connectionId);
// Avoid replaying dup commands
if(!cs.getSessionIds().contains(info.getSessionId())){
broker.addSession(cs.getContext(),info);
@ -603,7 +577,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
synchronized public Response processRemoveSession(SessionId id) throws Exception{
ConnectionId connectionId=id.getParentId();
ConnectionState cs=lookupConnectionState(connectionId);
TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState session=cs.getSessionState(id);
if(session==null)
throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
@ -631,21 +605,36 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return null;
}
synchronized public Response processAddConnection(ConnectionInfo info) throws Exception{
ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId());
if(state!=null){
// ConnectionInfo replay?? Chances are that it's a client reconnecting,
// and we have not detected that that old connection died.. Kill the old connection
// to make sure our state is in sync with the client.
if(this!=state.getConnection()){
log.debug("Killing previous stale connection: "+state.getConnection());
state.getConnection().stop();
if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){
throw new Exception("Previous connection could not be clean up.");
}
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
TransportConnectionState state;
// Make sure 2 concurrent connections by the same ID only generate 1 TransportConnectionState object.
synchronized(brokerConnectionStates) {
state=(TransportConnectionState)brokerConnectionStates.get(info.getConnectionId());
if( state==null ) {
state=new TransportConnectionState(info,this);
brokerConnectionStates.put(info.getConnectionId(),state);
}
state.incrementReference();
}
log.debug("Setting up new connection: "+this);
// If there are 2 concurrent connections for the same connection id, then last one in wins, we need to sync here
// to figure out the winner.
synchronized(state.connectMutex) {
if( state.getConnection()!=this ) {
log.debug("Killing previous stale connection: "+state.getConnection().getRemoteAddress());
state.getConnection().stop();
log.debug("Connection "+getRemoteAddress()+" taking over previous connection: "+state.getConnection().getRemoteAddress());
state.setConnection(this);
state.reset(info);
}
}
registerConnectionState(info.getConnectionId(),state);
log.debug("Setting up new connection: "+getRemoteAddress());
// Setup the context.
String clientId=info.getClientId();
context=new ConnectionContext();
@ -659,11 +648,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
context.setClientMaster(info.isClientMaster());
context.setWireFormatInfo(wireFormatInfo);
context.setNetworkConnection(networkConnection);
context.incrementReference();
this.manageable=info.isManageable();
state=new ConnectionState(info,context,this);
brokerConnectionStates.put(info.getConnectionId(),state);
localConnectionStates.put(info.getConnectionId(),state);
state.setContext(context);
state.setConnection(this);
broker.addConnection(context,info);
if(info.isManageable()&&broker.isFaultTolerantConfiguration()){
// send ConnectionCommand
@ -674,8 +662,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return null;
}
synchronized public Response processRemoveConnection(ConnectionId id){
ConnectionState cs=lookupConnectionState(id);
synchronized public Response processRemoveConnection(ConnectionId id){
TransportConnectionState cs=lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
@ -702,12 +691,15 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}catch(Throwable e){
serviceLog.warn("Failed to remove connection "+cs.getInfo(),e);
}
ConnectionState state=(ConnectionState)localConnectionStates.remove(id);
if(state!=null){
// If we are the last reference, we should remove the state
// from the broker.
if(state.getContext().decrementReference()==0){
brokerConnectionStates.remove(id);
TransportConnectionState state=unregisterConnectionState(id);
if(state!=null) {
synchronized(brokerConnectionStates) {
// If we are the last reference, we should remove the state
// from the broker.
if(state.decrementReference()==0){
brokerConnectionStates.remove(id);
}
}
}
return null;
@ -869,97 +861,103 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return;
}
}
if(stopped.compareAndSet(false,true)){
log.debug("Stopping connection: "+transport.getRemoteAddress());
connector.onStopped(this);
try{
synchronized(this){
if(masterBroker!=null){
masterBroker.stop();
}
if(duplexBridge!=null){
duplexBridge.stop();
}
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
if(transportException==null){
transport.oneway(new ShutdownInfo());
}
}
}catch(Exception ignore){
log.trace("Exception caught stopping",ignore);
}
transport.stop();
active=false;
if(disposed.compareAndSet(false,true)){
// Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock.
ArrayList l=new ArrayList(localConnectionStates.values());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionState cs=(ConnectionState) iter.next();
cs.getContext().getStopping().set(true);
}
if( taskRunner!=null ) {
taskRunner.wakeup();
// Give it a change to stop gracefully.
dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
disposeTransport();
taskRunner.shutdown();
} else {
disposeTransport();
}
if( taskRunner!=null )
taskRunner.shutdown();
// Run the MessageDispatch callbacks so that message references get cleaned up.
for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = (Command) iter.next();
if(command.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) command;
Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
if(sub!=null){
sub.run();
}
}
}
//
// Remove all logical connection associated with this connection
// from the broker.
if(!broker.isStopped()){
l=new ArrayList(localConnectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId)iter.next();
try{
log.debug("Cleaning up connection resources.");
processRemoveConnection(connectionId);
}catch(Throwable ignore){
ignore.printStackTrace();
}
}
if(brokerInfo!=null){
broker.removeBroker(this,brokerInfo);
}
}
stopLatch.countDown();
}
if(stopped.compareAndSet(false,true)) {
doStop();
stopLatch.countDown();
} else {
stopLatch.await();
}
}
protected void doStop() throws Exception, InterruptedException {
log.debug("Stopping connection: "+transport.getRemoteAddress());
connector.onStopped(this);
try{
synchronized(this){
if(masterBroker!=null){
masterBroker.stop();
}
if(duplexBridge!=null){
duplexBridge.stop();
}
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
if(transportException==null){
transport.oneway(new ShutdownInfo());
}
}
}catch(Exception ignore){
log.trace("Exception caught stopping",ignore);
}
if(disposed.compareAndSet(false,true)){
// Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock.
List<TransportConnectionState> connectionStates=listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
}
if( taskRunner!=null ) {
taskRunner.wakeup();
// Give it a change to stop gracefully.
dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
disposeTransport();
taskRunner.shutdown();
} else {
disposeTransport();
}
if( taskRunner!=null )
taskRunner.shutdown();
// Run the MessageDispatch callbacks so that message references get cleaned up.
for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = (Command) iter.next();
if(command.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) command;
Runnable sub=md.getTransmitCallback();
broker.processDispatch(md);
if(sub!=null){
sub.run();
}
}
}
//
// Remove all logical connection associated with this connection
// from the broker.
if (!broker.isStopped()) {
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
try {
log.debug("Cleaning up connection resources: " + getRemoteAddress());
processRemoveConnection(cs.getInfo().getConnectionId());
} catch (Throwable ignore) {
ignore.printStackTrace();
}
}
if (brokerInfo != null) {
broker.removeBroker(this, brokerInfo);
}
}
log.debug("Connection Stopped: " + getRemoteAddress());
}
}
/**
* @return Returns the blockedCandidate.
*/
* @return Returns the blockedCandidate.
*/
public boolean isBlockedCandidate(){
return blockedCandidate;
}
/**
* @param blockedCandidate The blockedCandidate to set.
*/
* @param blockedCandidate
* The blockedCandidate to set.
*/
public void setBlockedCandidate(boolean blockedCandidate){
this.blockedCandidate=blockedCandidate;
}
@ -1115,15 +1113,16 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(this.brokerInfo!=null){
log.warn("Unexpected extra broker info command received: "+info);
}
this.brokerInfo=info;
broker.addBroker(this,info);
networkConnection = true;
for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) {
ConnectionState cs = (ConnectionState) iter.next();
cs.getContext().setNetworkConnection(true);
}
return null;
this.brokerInfo = info;
broker.addBroker(this, info);
networkConnection = true;
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true);
}
return null;
}
protected void dispatch(Command command) throws IOException{
@ -1140,14 +1139,13 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
public String getConnectionId() {
Iterator iterator = localConnectionStates.values().iterator();
ConnectionState object = (ConnectionState) iterator.next();
if( object == null ) {
return null;
}
if( object.getInfo().getClientId() !=null )
return object.getInfo().getClientId();
return object.getInfo().getConnectionId().toString();
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
if( cs.getInfo().getClientId() !=null )
return cs.getInfo().getClientId();
return cs.getInfo().getConnectionId().toString();
}
return null;
}
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
@ -1155,7 +1153,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(result==null){
synchronized(producerExchanges){
result=new ProducerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
TransportConnectionState state=lookupConnectionState(id);
context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
@ -1186,7 +1184,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(result==null){
synchronized(consumerExchanges){
result=new ConsumerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
TransportConnectionState state=lookupConnectionState(id);
context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
@ -1253,4 +1251,71 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
return null;
}
///////////////////////////////////////////////////////////////////
//
// The following methods handle the logical connection state. It is possible
// multiple logical connections multiplexed over a single physical connection.
// But have not yet exploited the feature from the clients, so for performance
// reasons (to avoid a hash lookup) this class only keeps track of 1
// logical connection state.
//
// A sub class could override these methods to a full multiple logical connection
// support.
//
///////////////////////////////////////////////////////////////////
protected TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState state) {
TransportConnectionState rc = connectionState;
connectionState = state;
return rc;
}
protected TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
TransportConnectionState rc = connectionState;
connectionState = null;
return rc;
}
protected List<TransportConnectionState> listConnectionStates() {
ArrayList<TransportConnectionState> rc = new ArrayList<TransportConnectionState>();
if( connectionState!=null ) {
rc.add(connectionState);
}
return rc;
}
protected TransportConnectionState lookupConnectionState(String connectionId){
TransportConnectionState cs=connectionState;
if(cs==null)
throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: "
+connectionId);
return cs;
}
protected TransportConnectionState lookupConnectionState(ConsumerId id){
TransportConnectionState cs=connectionState;
if(cs==null)
throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
+id.getParentId().getParentId());
return cs;
}
protected TransportConnectionState lookupConnectionState(ProducerId id){
TransportConnectionState cs=connectionState;
if(cs==null)
throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
+id.getParentId().getParentId());
return cs;
}
protected TransportConnectionState lookupConnectionState(SessionId id){
TransportConnectionState cs=connectionState;
if(cs==null)
throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
+id.getParentId());
return cs;
}
protected TransportConnectionState lookupConnectionState(ConnectionId connectionId){
TransportConnectionState cs=connectionState;
if(cs==null)
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
return cs;
}
}

View File

@ -60,7 +60,7 @@ public class ManagedTransportConnection extends TransportConnection {
registerMBean(byAddressName);
}
public synchronized void stop() throws Exception {
public void doStop() throws Exception {
if (isStarting()) {
setPendingStop(true);
return;
@ -71,7 +71,7 @@ public class ManagedTransportConnection extends TransportConnection {
byClientIdName=null;
byAddressName=null;
}
super.stop();
super.doStop();
}
/**

View File

@ -57,6 +57,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -98,7 +99,7 @@ public class RegionBroker implements Broker {
private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
@ -605,7 +606,7 @@ public class RegionBroker implements Broker {
this.adminConnectionContext = adminConnectionContext;
}
public Map getConnectionStates() {
public Map<ConnectionId, ConnectionState> getConnectionStates() {
return connectionStates;
}

View File

@ -24,6 +24,8 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
@ -32,12 +34,9 @@ import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class ConnectionState {
final ConnectionInfo info;
ConnectionInfo info;
private final ConcurrentHashMap transactions = new ConcurrentHashMap();
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList());
@ -52,6 +51,15 @@ public class ConnectionState {
public String toString() {
return info.toString();
}
public void reset(ConnectionInfo info) {
this.info=info;
transactions.clear();
sessions.clear();
tempDestinations.clear();
shutdown.set(false);
}
public void addTempDestination(DestinationInfo info) {
checkShutdown();

View File

@ -17,16 +17,6 @@
*/
package org.apache.activemq.transport.tcp;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.net.SocketFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -40,6 +30,19 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
@ -64,6 +67,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected boolean useLocalHost = true;
protected int minmumWireFormatVersion;
protected SocketFactory socketFactory;
protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
private Map socketOptions;
private Boolean keepAlive;
@ -131,24 +135,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
*/
public void run() {
log.trace("TCP consumer thread starting");
while (!isStopped()) {
try {
Object command = readCommand();
doConsume(command);
}
catch (SocketTimeoutException e) {
}
catch (InterruptedIOException e) {
}
catch (IOException e) {
try {
stop();
}
catch (Exception e2) {
log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
}
onException(e);
}
try {
while (!isStopped()) {
try {
Object command = readCommand();
doConsume(command);
}
catch (SocketTimeoutException e) {
}
catch (InterruptedIOException e) {
}
}
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
} finally {
stoppedLatch.get().countDown();
}
}
@ -301,6 +303,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected void doStart() throws Exception {
connect();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}
@ -355,6 +358,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
initializeStreams();
}
protected void doStop(ServiceStopper stopper) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Stopping transport " + this);
@ -367,6 +371,19 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
socket.close();
}
}
/**
* Override so that stop() blocks until the run thread is no longer running.
*/
@Override
public void stop() throws Exception {
super.stop();
CountDownLatch countDownLatch = stoppedLatch.get();
if( countDownLatch!=null ) {
countDownLatch.await();
}
}
protected void initializeStreams() throws Exception {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);

View File

@ -32,6 +32,7 @@ import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -53,10 +54,8 @@ public class VMTransport implements Transport,Task{
protected boolean network;
protected boolean async=true;
protected int asyncQueueDepth=2000;
protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
protected LinkedBlockingQueue messageQueue=null;
protected boolean started;
protected final Object startMutex = new Object();
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
@ -85,45 +84,37 @@ public class VMTransport implements Transport,Task{
}
if(peer==null)
throw new IOException("Peer not connected.");
if(!peer.disposed){
if(async){
asyncOneWay(command);
}else{
syncOneWay(command);
}
}else{
throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
}
}
protected void syncOneWay(Object command){
TransportListener tl=null;
synchronized(peer.startMutex){
synchronized(peer.mutex) {
if( peer.disposed ) {
throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
}
if( peer.started ) {
tl = peer.transportListener;
} else if(!peer.disposed) {
peer.prePeerSetQueue.add(command);
if(peer.async){
peer.enqueue(command);
peer.wakeup();
} else {
tl = peer.transportListener;
}
} else {
peer.enqueue(command);
}
}
if( tl!=null ) {
tl.onCommand(command);
}
tl.onCommand(command);
}
}
protected void asyncOneWay(Object command) throws IOException{
try{
synchronized(mutex){
if(messageQueue==null){
messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
}
}
messageQueue.put(command);
wakeup();
}catch(final InterruptedException e){
log.error("messageQueue interupted",e);
throw new IOException(e.getMessage());
}
}
private void enqueue(Object command) throws IOException {
try{
getMessageQueue().put(command);
}catch(final InterruptedException e){
throw IOExceptionSupport.create(e);
}
}
public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
throw new AssertionError("Unsupported Method");
@ -146,32 +137,38 @@ public class VMTransport implements Transport,Task{
public void setTransportListener(TransportListener commandListener){
synchronized(mutex){
this.transportListener=commandListener;
wakeup();
}
wakeup();
peer.wakeup();
}
private LinkedBlockingQueue getMessageQueue() {
synchronized(mutex) {
if( messageQueue==null ) {
messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
}
return messageQueue;
}
}
public void start() throws Exception{
if(transportListener==null)
throw new IOException("TransportListener not set.");
synchronized(startMutex) {
if( !prePeerSetQueue.isEmpty() ) {
for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
Command command=(Command)iter.next();
transportListener.onCommand(command);
}
prePeerSetQueue.clear();
}
synchronized(mutex) {
if( messageQueue!=null ) {
Object command;
while( (command = messageQueue.poll()) !=null ) {
transportListener.onCommand(command);
}
}
started = true;
if( isAsync() ) {
peer.wakeup();
wakeup();
}
wakeup();
}
}
public void stop() throws Exception{
synchronized(startMutex) {
synchronized(mutex) {
if(!disposed){
started=false;
disposed=true;
@ -221,18 +218,21 @@ public class VMTransport implements Transport,Task{
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate(){
final TransportListener tl=peer.transportListener;
Command command=null;
final TransportListener tl;
synchronized(mutex){
if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){
command=(Command)messageQueue.poll();
}
tl = transportListener;
if( !started || disposed || tl==null )
return false;
}
if(tl!=null&&command!=null){
LinkedBlockingQueue mq = getMessageQueue();
final Command command = (Command)mq.poll();
if( command!=null ) {
tl.onCommand(command);
}
boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
return result;
return !mq.isEmpty();
} else {
return false;
}
}
/**

View File

@ -184,8 +184,9 @@ public class JmsTempDestinationTest extends TestCase {
* Make sure you cannot publish to a temp destination that does not exist anymore.
*
* @throws JMSException
* @throws InterruptedException
*/
public void testPublishFailsForClosedConnection() throws JMSException {
public void testPublishFailsForClosedConnection() throws JMSException, InterruptedException {
Connection tempConnection = factory.createConnection();
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -202,6 +203,7 @@ public class JmsTempDestinationTest extends TestCase {
// Closing the connection should destroy the temp queue that was created.
tempConnection.close();
Thread.sleep(1000); // Wait a little bit to let the delete take effect.
// This message delivery NOT should work since the temp connection is now closed.
try {