diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index 559bf69ca4..b39a1e8a1c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -54,7 +54,6 @@ public class ConnectionContext { private Object longTermStoreContext; private boolean producerFlowControl=true; private MessageAuthorizationPolicy messageAuthorizationPolicy; - private AtomicInteger referenceCounter = new AtomicInteger(); private boolean networkConnection; private final AtomicBoolean stopping = new AtomicBoolean(); private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); @@ -241,15 +240,6 @@ public class ConnectionContext { } return true; } - - public int incrementReference() { - return referenceCounter.incrementAndGet(); - } - - public int decrementReference() { - return referenceCounter.decrementAndGet(); - } - public synchronized boolean isNetworkConnection() { return networkConnection; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 71dab65ab2..73042bf22f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -72,6 +72,7 @@ import org.apache.activemq.network.NetworkBridgeConfiguration; import org.apache.activemq.network.NetworkBridgeFactory; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.state.CommandVisitor; +import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.SessionState; @@ -105,8 +106,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit private final Transport transport; private MessageAuthorizationPolicy messageAuthorizationPolicy; // Keeps track of the state of the connections. - protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap(); - protected final Map brokerConnectionStates; +// protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap(); + protected final Map brokerConnectionStates; // The broker and wireformat info that was exchanged. protected BrokerInfo brokerInfo; private WireFormatInfo wireFormatInfo; @@ -140,16 +141,18 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private DemandForwardingBridge duplexBridge = null; final private TaskRunnerFactory taskRunnerFactory; + private TransportConnectionState connectionState; - static class ConnectionState extends org.apache.activemq.state.ConnectionState{ + static class TransportConnectionState extends org.apache.activemq.state.ConnectionState{ - private final ConnectionContext context; - TransportConnection connection; + private ConnectionContext context; + private TransportConnection connection; + private final Object connectMutex = new Object(); + private AtomicInteger referenceCounter = new AtomicInteger(); - public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){ + public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection){ super(info); - this.context=context; - this.connection=connection; + connection=transportConnection; } public ConnectionContext getContext(){ @@ -159,6 +162,23 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public TransportConnection getConnection(){ return connection; } + + public void setContext(ConnectionContext context) { + this.context = context; + } + + public void setConnection(TransportConnection connection) { + this.connection = connection; + } + + public int incrementReference() { + return referenceCounter.incrementAndGet(); + } + + public int decrementReference() { + return referenceCounter.decrementAndGet(); + } + } /** @@ -307,36 +327,6 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return response; } - protected ConnectionState lookupConnectionState(ConsumerId id){ - ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId()); - if(cs==null) - throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " - +id.getParentId().getParentId()); - return cs; - } - - protected ConnectionState lookupConnectionState(ProducerId id){ - ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId()); - if(cs==null) - throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " - +id.getParentId().getParentId()); - return cs; - } - - protected ConnectionState lookupConnectionState(SessionId id){ - ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId()); - if(cs==null) - throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " - +id.getParentId()); - return cs; - } - - protected ConnectionState lookupConnectionState(ConnectionId connectionId){ - ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId); - if(cs==null) - throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId); - return cs; - } public Response processKeepAlive(KeepAliveInfo info) throws Exception{ return null; @@ -354,7 +344,15 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } public Response processShutdown(ShutdownInfo info) throws Exception{ - stop(); + new Thread("Async Exception Handler"){ + public void run(){ + try { + TransportConnection.this.stop(); + } catch (Exception e) { + serviceException(e); + } + } + }.start(); return null; } @@ -363,7 +361,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{ - ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); context=null; if(cs!=null){ context=cs.getContext(); @@ -387,7 +385,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{ - ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); context=null; if(cs!=null){ context=cs.getContext(); @@ -413,63 +411,39 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception{ - ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); - context=null; - if(cs!=null){ - context=cs.getContext(); - } - if (cs == null) { - throw new NullPointerException("Context is null"); - } + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); + context=cs.getContext(); cs.removeTransactionState(info.getTransactionId()); broker.commitTransaction(context,info.getTransactionId(),true); return null; } synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{ - ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); - context=null; - if(cs!=null){ - context=cs.getContext(); - } - if (cs == null) { - throw new NullPointerException("Context is null"); - } + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); + context=cs.getContext(); cs.removeTransactionState(info.getTransactionId()); broker.commitTransaction(context,info.getTransactionId(),false); return null; } synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception{ - ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); - context=null; - if(cs!=null){ - context=cs.getContext(); - } - if (cs == null) { - throw new NullPointerException("Context is null"); - } + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); + context=cs.getContext(); cs.removeTransactionState(info.getTransactionId()); broker.rollbackTransaction(context,info.getTransactionId()); return null; } synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{ - ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); - context=null; - if(cs!=null){ - context=cs.getContext(); - } + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); + context=cs.getContext(); broker.forgetTransaction(context,info.getTransactionId()); return null; } synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception{ - ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId()); - context=null; - if(cs!=null){ - context=cs.getContext(); - } + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); + context=cs.getContext(); TransactionId[] preparedTransactions=broker.getPreparedTransactions(context); return new DataArrayResponse(preparedTransactions); } @@ -497,7 +471,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } synchronized public Response processAddDestination(DestinationInfo info) throws Exception{ - ConnectionState cs=lookupConnectionState(info.getConnectionId()); + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); broker.addDestinationInfo(cs.getContext(),info); if(info.getDestination().isTemporary()){ cs.addTempDestination(info); @@ -506,7 +480,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception{ - ConnectionState cs=lookupConnectionState(info.getConnectionId()); + TransportConnectionState cs=lookupConnectionState(info.getConnectionId()); broker.removeDestinationInfo(cs.getContext(),info); if(info.getDestination().isTemporary()){ cs.removeTempDestination(info.getDestination()); @@ -517,7 +491,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit synchronized public Response processAddProducer(ProducerInfo info) throws Exception{ SessionId sessionId=info.getProducerId().getParentId(); ConnectionId connectionId=sessionId.getParentId(); - ConnectionState cs=lookupConnectionState(connectionId); + TransportConnectionState cs=lookupConnectionState(connectionId); SessionState ss=cs.getSessionState(sessionId); if(ss==null) throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " @@ -537,7 +511,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit synchronized public Response processRemoveProducer(ProducerId id) throws Exception{ SessionId sessionId=id.getParentId(); ConnectionId connectionId=sessionId.getParentId(); - ConnectionState cs=lookupConnectionState(connectionId); + TransportConnectionState cs=lookupConnectionState(connectionId); SessionState ss=cs.getSessionState(sessionId); if(ss==null) throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " @@ -553,7 +527,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception{ SessionId sessionId=info.getConsumerId().getParentId(); ConnectionId connectionId=sessionId.getParentId(); - ConnectionState cs=lookupConnectionState(connectionId); + TransportConnectionState cs=lookupConnectionState(connectionId); SessionState ss=cs.getSessionState(sessionId); if(ss==null) throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: " @@ -573,7 +547,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception{ SessionId sessionId=id.getParentId(); ConnectionId connectionId=sessionId.getParentId(); - ConnectionState cs=lookupConnectionState(connectionId); + TransportConnectionState cs=lookupConnectionState(connectionId); SessionState ss=cs.getSessionState(sessionId); if(ss==null) throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " @@ -588,7 +562,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit synchronized public Response processAddSession(SessionInfo info) throws Exception{ ConnectionId connectionId=info.getSessionId().getParentId(); - ConnectionState cs=lookupConnectionState(connectionId); + TransportConnectionState cs=lookupConnectionState(connectionId); // Avoid replaying dup commands if(!cs.getSessionIds().contains(info.getSessionId())){ broker.addSession(cs.getContext(),info); @@ -603,7 +577,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit synchronized public Response processRemoveSession(SessionId id) throws Exception{ ConnectionId connectionId=id.getParentId(); - ConnectionState cs=lookupConnectionState(connectionId); + TransportConnectionState cs=lookupConnectionState(connectionId); SessionState session=cs.getSessionState(id); if(session==null) throw new IllegalStateException("Cannot remove session that had not been registered: "+id); @@ -631,21 +605,36 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return null; } - synchronized public Response processAddConnection(ConnectionInfo info) throws Exception{ - ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId()); - if(state!=null){ - // ConnectionInfo replay?? Chances are that it's a client reconnecting, - // and we have not detected that that old connection died.. Kill the old connection - // to make sure our state is in sync with the client. - if(this!=state.getConnection()){ - log.debug("Killing previous stale connection: "+state.getConnection()); - state.getConnection().stop(); - if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){ - throw new Exception("Previous connection could not be clean up."); - } - } + public Response processAddConnection(ConnectionInfo info) throws Exception { + + TransportConnectionState state; + + // Make sure 2 concurrent connections by the same ID only generate 1 TransportConnectionState object. + synchronized(brokerConnectionStates) { + state=(TransportConnectionState)brokerConnectionStates.get(info.getConnectionId()); + if( state==null ) { + state=new TransportConnectionState(info,this); + brokerConnectionStates.put(info.getConnectionId(),state); + } + state.incrementReference(); } - log.debug("Setting up new connection: "+this); + + + // If there are 2 concurrent connections for the same connection id, then last one in wins, we need to sync here + // to figure out the winner. + synchronized(state.connectMutex) { + if( state.getConnection()!=this ) { + log.debug("Killing previous stale connection: "+state.getConnection().getRemoteAddress()); + state.getConnection().stop(); + log.debug("Connection "+getRemoteAddress()+" taking over previous connection: "+state.getConnection().getRemoteAddress()); + state.setConnection(this); + state.reset(info); + } + } + + registerConnectionState(info.getConnectionId(),state); + + log.debug("Setting up new connection: "+getRemoteAddress()); // Setup the context. String clientId=info.getClientId(); context=new ConnectionContext(); @@ -659,11 +648,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit context.setClientMaster(info.isClientMaster()); context.setWireFormatInfo(wireFormatInfo); context.setNetworkConnection(networkConnection); - context.incrementReference(); this.manageable=info.isManageable(); - state=new ConnectionState(info,context,this); - brokerConnectionStates.put(info.getConnectionId(),state); - localConnectionStates.put(info.getConnectionId(),state); + state.setContext(context); + state.setConnection(this); + broker.addConnection(context,info); if(info.isManageable()&&broker.isFaultTolerantConfiguration()){ // send ConnectionCommand @@ -674,8 +662,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return null; } - synchronized public Response processRemoveConnection(ConnectionId id){ - ConnectionState cs=lookupConnectionState(id); + + synchronized public Response processRemoveConnection(ConnectionId id){ + TransportConnectionState cs=lookupConnectionState(id); // Don't allow things to be added to the connection state while we are shutting down. cs.shutdown(); // Cascade the connection stop to the sessions. @@ -702,12 +691,15 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit }catch(Throwable e){ serviceLog.warn("Failed to remove connection "+cs.getInfo(),e); } - ConnectionState state=(ConnectionState)localConnectionStates.remove(id); - if(state!=null){ - // If we are the last reference, we should remove the state - // from the broker. - if(state.getContext().decrementReference()==0){ - brokerConnectionStates.remove(id); + + TransportConnectionState state=unregisterConnectionState(id); + if(state!=null) { + synchronized(brokerConnectionStates) { + // If we are the last reference, we should remove the state + // from the broker. + if(state.decrementReference()==0){ + brokerConnectionStates.remove(id); + } } } return null; @@ -869,97 +861,103 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return; } } - if(stopped.compareAndSet(false,true)){ - log.debug("Stopping connection: "+transport.getRemoteAddress()); - connector.onStopped(this); - try{ - synchronized(this){ - if(masterBroker!=null){ - masterBroker.stop(); - } - if(duplexBridge!=null){ - duplexBridge.stop(); - } - // If the transport has not failed yet, - // notify the peer that we are doing a normal shutdown. - if(transportException==null){ - transport.oneway(new ShutdownInfo()); - } - } - - }catch(Exception ignore){ - log.trace("Exception caught stopping",ignore); - } - transport.stop(); - active=false; - if(disposed.compareAndSet(false,true)){ - - // Let all the connection contexts know we are shutting down - // so that in progress operations can notice and unblock. - ArrayList l=new ArrayList(localConnectionStates.values()); - for(Iterator iter=l.iterator();iter.hasNext();){ - ConnectionState cs=(ConnectionState) iter.next(); - cs.getContext().getStopping().set(true); - } - - if( taskRunner!=null ) { - taskRunner.wakeup(); - // Give it a change to stop gracefully. - dispatchStoppedLatch.await(5, TimeUnit.SECONDS); - disposeTransport(); - taskRunner.shutdown(); - } else { - disposeTransport(); - } - - if( taskRunner!=null ) - taskRunner.shutdown(); - - // Run the MessageDispatch callbacks so that message references get cleaned up. - for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { - Command command = (Command) iter.next(); - if(command.isMessageDispatch()) { - MessageDispatch md=(MessageDispatch) command; - Runnable sub=md.getTransmitCallback(); - broker.processDispatch(md); - if(sub!=null){ - sub.run(); - } - } - } - // - // Remove all logical connection associated with this connection - // from the broker. - if(!broker.isStopped()){ - l=new ArrayList(localConnectionStates.keySet()); - for(Iterator iter=l.iterator();iter.hasNext();){ - ConnectionId connectionId=(ConnectionId)iter.next(); - try{ - log.debug("Cleaning up connection resources."); - processRemoveConnection(connectionId); - }catch(Throwable ignore){ - ignore.printStackTrace(); - } - } - if(brokerInfo!=null){ - broker.removeBroker(this,brokerInfo); - } - } - stopLatch.countDown(); - } + if(stopped.compareAndSet(false,true)) { + doStop(); + stopLatch.countDown(); + } else { + stopLatch.await(); } } + protected void doStop() throws Exception, InterruptedException { + log.debug("Stopping connection: "+transport.getRemoteAddress()); + connector.onStopped(this); + try{ + synchronized(this){ + if(masterBroker!=null){ + masterBroker.stop(); + } + if(duplexBridge!=null){ + duplexBridge.stop(); + } + // If the transport has not failed yet, + // notify the peer that we are doing a normal shutdown. + if(transportException==null){ + transport.oneway(new ShutdownInfo()); + } + } + + }catch(Exception ignore){ + log.trace("Exception caught stopping",ignore); + } + if(disposed.compareAndSet(false,true)){ + + // Let all the connection contexts know we are shutting down + // so that in progress operations can notice and unblock. + List connectionStates=listConnectionStates(); + for (TransportConnectionState cs : connectionStates) { + cs.getContext().getStopping().set(true); + } + + if( taskRunner!=null ) { + taskRunner.wakeup(); + // Give it a change to stop gracefully. + dispatchStoppedLatch.await(5, TimeUnit.SECONDS); + disposeTransport(); + taskRunner.shutdown(); + } else { + disposeTransport(); + } + + if( taskRunner!=null ) + taskRunner.shutdown(); + + // Run the MessageDispatch callbacks so that message references get cleaned up. + for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { + Command command = (Command) iter.next(); + if(command.isMessageDispatch()) { + MessageDispatch md=(MessageDispatch) command; + Runnable sub=md.getTransmitCallback(); + broker.processDispatch(md); + if(sub!=null){ + sub.run(); + } + } + } + // + // Remove all logical connection associated with this connection + // from the broker. + + if (!broker.isStopped()) { + for (TransportConnectionState cs : connectionStates) { + cs.getContext().getStopping().set(true); + try { + log.debug("Cleaning up connection resources: " + getRemoteAddress()); + processRemoveConnection(cs.getInfo().getConnectionId()); + } catch (Throwable ignore) { + ignore.printStackTrace(); + } + } + + if (brokerInfo != null) { + broker.removeBroker(this, brokerInfo); + } + } + log.debug("Connection Stopped: " + getRemoteAddress()); + } + } + /** - * @return Returns the blockedCandidate. - */ + * @return Returns the blockedCandidate. + */ public boolean isBlockedCandidate(){ return blockedCandidate; } /** - * @param blockedCandidate The blockedCandidate to set. - */ + * @param blockedCandidate + * The blockedCandidate to set. + */ public void setBlockedCandidate(boolean blockedCandidate){ this.blockedCandidate=blockedCandidate; } @@ -1115,15 +1113,16 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(this.brokerInfo!=null){ log.warn("Unexpected extra broker info command received: "+info); } - this.brokerInfo=info; - broker.addBroker(this,info); - networkConnection = true; - for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) { - ConnectionState cs = (ConnectionState) iter.next(); - cs.getContext().setNetworkConnection(true); - } - - return null; + this.brokerInfo = info; + broker.addBroker(this, info); + networkConnection = true; + + List connectionStates = listConnectionStates(); + for (TransportConnectionState cs : connectionStates) { + cs.getContext().setNetworkConnection(true); + } + + return null; } protected void dispatch(Command command) throws IOException{ @@ -1140,14 +1139,13 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } public String getConnectionId() { - Iterator iterator = localConnectionStates.values().iterator(); - ConnectionState object = (ConnectionState) iterator.next(); - if( object == null ) { - return null; - } - if( object.getInfo().getClientId() !=null ) - return object.getInfo().getClientId(); - return object.getInfo().getConnectionId().toString(); + List connectionStates = listConnectionStates(); + for (TransportConnectionState cs : connectionStates) { + if( cs.getInfo().getClientId() !=null ) + return cs.getInfo().getClientId(); + return cs.getInfo().getConnectionId().toString(); + } + return null; } private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){ @@ -1155,7 +1153,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(result==null){ synchronized(producerExchanges){ result=new ProducerBrokerExchange(); - ConnectionState state=lookupConnectionState(id); + TransportConnectionState state=lookupConnectionState(id); context=state.getContext(); result.setConnectionContext(context); SessionState ss=state.getSessionState(id.getParentId()); @@ -1186,7 +1184,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(result==null){ synchronized(consumerExchanges){ result=new ConsumerBrokerExchange(); - ConnectionState state=lookupConnectionState(id); + TransportConnectionState state=lookupConnectionState(id); context=state.getContext(); result.setConnectionContext(context); SessionState ss=state.getSessionState(id.getParentId()); @@ -1253,4 +1251,71 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return null; } + /////////////////////////////////////////////////////////////////// + // + // The following methods handle the logical connection state. It is possible + // multiple logical connections multiplexed over a single physical connection. + // But have not yet exploited the feature from the clients, so for performance + // reasons (to avoid a hash lookup) this class only keeps track of 1 + // logical connection state. + // + // A sub class could override these methods to a full multiple logical connection + // support. + // + /////////////////////////////////////////////////////////////////// + + protected TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState state) { + TransportConnectionState rc = connectionState; + connectionState = state; + return rc; + } + + protected TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { + TransportConnectionState rc = connectionState; + connectionState = null; + return rc; + } + protected List listConnectionStates() { + ArrayList rc = new ArrayList(); + if( connectionState!=null ) { + rc.add(connectionState); + } + return rc; + } + + protected TransportConnectionState lookupConnectionState(String connectionId){ + TransportConnectionState cs=connectionState; + if(cs==null) + throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: " + +connectionId); + return cs; + } + protected TransportConnectionState lookupConnectionState(ConsumerId id){ + TransportConnectionState cs=connectionState; + if(cs==null) + throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " + +id.getParentId().getParentId()); + return cs; + } + protected TransportConnectionState lookupConnectionState(ProducerId id){ + TransportConnectionState cs=connectionState; + if(cs==null) + throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " + +id.getParentId().getParentId()); + return cs; + } + protected TransportConnectionState lookupConnectionState(SessionId id){ + TransportConnectionState cs=connectionState; + if(cs==null) + throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " + +id.getParentId()); + return cs; + } + protected TransportConnectionState lookupConnectionState(ConnectionId connectionId){ + TransportConnectionState cs=connectionState; + if(cs==null) + throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId); + return cs; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java index bcb26bdc79..3a0af68e93 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java @@ -60,7 +60,7 @@ public class ManagedTransportConnection extends TransportConnection { registerMBean(byAddressName); } - public synchronized void stop() throws Exception { + public void doStop() throws Exception { if (isStarting()) { setPendingStop(true); return; @@ -71,7 +71,7 @@ public class ManagedTransportConnection extends TransportConnection { byClientIdName=null; byAddressName=null; } - super.stop(); + super.doStop(); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 9b3cb3f44d..b08aced97a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -57,6 +57,7 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.state.ConnectionState; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.thread.TaskRunnerFactory; @@ -98,7 +99,7 @@ public class RegionBroker implements Broker { private final DestinationInterceptor destinationInterceptor; private ConnectionContext adminConnectionContext; protected DestinationFactory destinationFactory; - protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); + protected final Map connectionStates = Collections.synchronizedMap(new HashMap()); @@ -605,7 +606,7 @@ public class RegionBroker implements Broker { this.adminConnectionContext = adminConnectionContext; } - public Map getConnectionStates() { + public Map getConnectionStates() { return connectionStates; } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java index 0adb48873c..c8ee7f1d03 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java @@ -24,6 +24,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; @@ -32,12 +34,9 @@ import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - public class ConnectionState { - final ConnectionInfo info; + ConnectionInfo info; private final ConcurrentHashMap transactions = new ConcurrentHashMap(); private final ConcurrentHashMap sessions = new ConcurrentHashMap(); private final List tempDestinations = Collections.synchronizedList(new ArrayList()); @@ -52,6 +51,15 @@ public class ConnectionState { public String toString() { return info.toString(); } + + public void reset(ConnectionInfo info) { + this.info=info; + transactions.clear(); + sessions.clear(); + tempDestinations.clear(); + shutdown.set(false); + } + public void addTempDestination(DestinationInfo info) { checkShutdown(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 7e1dd6c2e4..33969416cf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -17,16 +17,6 @@ */ package org.apache.activemq.transport.tcp; -import org.apache.activemq.Service; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportThreadSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.net.SocketFactory; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -40,6 +30,19 @@ import java.net.URI; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.SocketFactory; + +import org.apache.activemq.Service; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportThreadSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * An implementation of the {@link Transport} interface using raw tcp/ip @@ -64,6 +67,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S protected boolean useLocalHost = true; protected int minmumWireFormatVersion; protected SocketFactory socketFactory; + protected final AtomicReference stoppedLatch = new AtomicReference(); private Map socketOptions; private Boolean keepAlive; @@ -131,24 +135,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S */ public void run() { log.trace("TCP consumer thread starting"); - while (!isStopped()) { - try { - Object command = readCommand(); - doConsume(command); - } - catch (SocketTimeoutException e) { - } - catch (InterruptedIOException e) { - } - catch (IOException e) { - try { - stop(); - } - catch (Exception e2) { - log.warn("Caught while closing: " + e2 + ". Now Closed", e2); - } - onException(e); - } + try { + while (!isStopped()) { + try { + Object command = readCommand(); + doConsume(command); + } + catch (SocketTimeoutException e) { + } + catch (InterruptedIOException e) { + } + } + } catch (IOException e) { + stoppedLatch.get().countDown(); + onException(e); + } finally { + stoppedLatch.get().countDown(); } } @@ -301,6 +303,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S protected void doStart() throws Exception { connect(); + stoppedLatch.set(new CountDownLatch(1)); super.doStart(); } @@ -355,6 +358,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S initializeStreams(); } + protected void doStop(ServiceStopper stopper) throws Exception { if (log.isDebugEnabled()) { log.debug("Stopping transport " + this); @@ -367,6 +371,19 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S socket.close(); } } + + + /** + * Override so that stop() blocks until the run thread is no longer running. + */ + @Override + public void stop() throws Exception { + super.stop(); + CountDownLatch countDownLatch = stoppedLatch.get(); + if( countDownLatch!=null ) { + countDownLatch.await(); + } + } protected void initializeStreams() throws Exception { TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 2461971321..65e39d2af8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -32,6 +32,7 @@ import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.util.IOExceptionSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,10 +54,8 @@ public class VMTransport implements Transport,Task{ protected boolean network; protected boolean async=true; protected int asyncQueueDepth=2000; - protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList()); protected LinkedBlockingQueue messageQueue=null; protected boolean started; - protected final Object startMutex = new Object(); protected final URI location; protected final long id; private TaskRunner taskRunner; @@ -85,45 +84,37 @@ public class VMTransport implements Transport,Task{ } if(peer==null) throw new IOException("Peer not connected."); - if(!peer.disposed){ - if(async){ - asyncOneWay(command); - }else{ - syncOneWay(command); - } - }else{ - throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed."); - } - } - protected void syncOneWay(Object command){ TransportListener tl=null; - synchronized(peer.startMutex){ + synchronized(peer.mutex) { + if( peer.disposed ) { + throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed."); + } if( peer.started ) { - tl = peer.transportListener; - } else if(!peer.disposed) { - peer.prePeerSetQueue.add(command); + if(peer.async){ + peer.enqueue(command); + peer.wakeup(); + } else { + tl = peer.transportListener; + } + } else { + peer.enqueue(command); } } + if( tl!=null ) { - tl.onCommand(command); - } + tl.onCommand(command); + } + } - protected void asyncOneWay(Object command) throws IOException{ - try{ - synchronized(mutex){ - if(messageQueue==null){ - messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth); - } - } - messageQueue.put(command); - wakeup(); - }catch(final InterruptedException e){ - log.error("messageQueue interupted",e); - throw new IOException(e.getMessage()); - } - } + private void enqueue(Object command) throws IOException { + try{ + getMessageQueue().put(command); + }catch(final InterruptedException e){ + throw IOExceptionSupport.create(e); + } + } public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{ throw new AssertionError("Unsupported Method"); @@ -146,32 +137,38 @@ public class VMTransport implements Transport,Task{ public void setTransportListener(TransportListener commandListener){ synchronized(mutex){ this.transportListener=commandListener; + wakeup(); } - wakeup(); - peer.wakeup(); } + private LinkedBlockingQueue getMessageQueue() { + synchronized(mutex) { + if( messageQueue==null ) { + messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth); + } + return messageQueue; + } + } + + public void start() throws Exception{ if(transportListener==null) throw new IOException("TransportListener not set."); - synchronized(startMutex) { - if( !prePeerSetQueue.isEmpty() ) { - for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ - Command command=(Command)iter.next(); - transportListener.onCommand(command); - } - prePeerSetQueue.clear(); - } + + synchronized(mutex) { + if( messageQueue!=null ) { + Object command; + while( (command = messageQueue.poll()) !=null ) { + transportListener.onCommand(command); + } + } started = true; - if( isAsync() ) { - peer.wakeup(); - wakeup(); - } + wakeup(); } } public void stop() throws Exception{ - synchronized(startMutex) { + synchronized(mutex) { if(!disposed){ started=false; disposed=true; @@ -221,18 +218,21 @@ public class VMTransport implements Transport,Task{ * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate(){ - final TransportListener tl=peer.transportListener; - Command command=null; + final TransportListener tl; synchronized(mutex){ - if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){ - command=(Command)messageQueue.poll(); - } + tl = transportListener; + if( !started || disposed || tl==null ) + return false; } - if(tl!=null&&command!=null){ + + LinkedBlockingQueue mq = getMessageQueue(); + final Command command = (Command)mq.poll(); + if( command!=null ) { tl.onCommand(command); - } - boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed; - return result; + return !mq.isEmpty(); + } else { + return false; + } } /** diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java index 85f17027e5..99c05a3123 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java @@ -184,8 +184,9 @@ public class JmsTempDestinationTest extends TestCase { * Make sure you cannot publish to a temp destination that does not exist anymore. * * @throws JMSException + * @throws InterruptedException */ - public void testPublishFailsForClosedConnection() throws JMSException { + public void testPublishFailsForClosedConnection() throws JMSException, InterruptedException { Connection tempConnection = factory.createConnection(); Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -202,6 +203,7 @@ public class JmsTempDestinationTest extends TestCase { // Closing the connection should destroy the temp queue that was created. tempConnection.close(); + Thread.sleep(1000); // Wait a little bit to let the delete take effect. // This message delivery NOT should work since the temp connection is now closed. try {