mirror of
https://github.com/apache/activemq.git
synced 2025-02-10 03:56:21 +00:00
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@440548 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
861a08912b
commit
a2efa7e2e3
@ -23,9 +23,11 @@ import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.broker.region.ConnectionStatistics;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.ConnectionControl;
|
||||
@ -88,7 +90,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
private boolean inServiceException=false;
|
||||
private boolean manageable;
|
||||
|
||||
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
|
||||
protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
|
||||
protected final Map brokerConnectionStates;
|
||||
|
||||
private WireFormatInfo wireFormatInfo;
|
||||
protected boolean disposed=false;
|
||||
@ -117,6 +120,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
|
||||
this.connector = connector;
|
||||
this.broker = broker;
|
||||
|
||||
RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
|
||||
brokerConnectionStates = rb.getConnectionStates();
|
||||
|
||||
if (connector != null) {
|
||||
this.statistics.setParent(connector.getStatistics());
|
||||
}
|
||||
@ -153,7 +160,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
// Remove all logical connection associated with this connection
|
||||
// from the broker.
|
||||
if(!broker.isStopped()){
|
||||
ArrayList l=new ArrayList(connectionStates.keySet());
|
||||
ArrayList l=new ArrayList(localConnectionStates.keySet());
|
||||
for(Iterator iter=l.iterator();iter.hasNext();){
|
||||
ConnectionId connectionId=(ConnectionId) iter.next();
|
||||
try{
|
||||
@ -245,25 +252,25 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
protected ConnectionState lookupConnectionState(ConsumerId id) {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
|
||||
if( cs== null )
|
||||
throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "+id.getParentId().getParentId());
|
||||
return cs;
|
||||
}
|
||||
protected ConnectionState lookupConnectionState(ProducerId id) {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
|
||||
if( cs== null )
|
||||
throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "+id.getParentId().getParentId());
|
||||
return cs;
|
||||
}
|
||||
protected ConnectionState lookupConnectionState(SessionId id) {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId());
|
||||
if( cs== null )
|
||||
throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "+id.getParentId());
|
||||
return cs;
|
||||
}
|
||||
protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(connectionId);
|
||||
if( cs== null )
|
||||
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
|
||||
return cs;
|
||||
@ -293,7 +300,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processBeginTransaction(TransactionInfo info) throws Exception {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
|
||||
ConnectionContext context=null;
|
||||
if( cs!=null ) {
|
||||
context = cs.getContext();
|
||||
@ -310,7 +317,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
|
||||
ConnectionContext context=null;
|
||||
if( cs!=null ) {
|
||||
context = cs.getContext();
|
||||
@ -321,7 +328,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
|
||||
ConnectionContext context=null;
|
||||
if( cs!=null ) {
|
||||
context = cs.getContext();
|
||||
@ -331,7 +338,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
|
||||
ConnectionContext context=null;
|
||||
if( cs!=null ) {
|
||||
context = cs.getContext();
|
||||
@ -341,7 +348,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
|
||||
ConnectionContext context=null;
|
||||
if( cs!=null ) {
|
||||
context = cs.getContext();
|
||||
@ -351,7 +358,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processForgetTransaction(TransactionInfo info) throws Exception {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
|
||||
ConnectionContext context=null;
|
||||
if( cs!=null ) {
|
||||
context = cs.getContext();
|
||||
@ -361,7 +368,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
|
||||
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
|
||||
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
|
||||
ConnectionContext context=null;
|
||||
if( cs!=null ) {
|
||||
context = cs.getContext();
|
||||
@ -428,12 +435,16 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
SessionState ss = cs.getSessionState(sessionId);
|
||||
if( ss == null )
|
||||
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
|
||||
broker.addProducer(cs.getContext(), info);
|
||||
try {
|
||||
ss.addProducer(info);
|
||||
} catch (IllegalStateException e) {
|
||||
broker.removeProducer(cs.getContext(), info);
|
||||
}
|
||||
|
||||
// Avoid replaying dup commands
|
||||
if( !ss.getProducerIds().contains(info.getProducerId()) ) {
|
||||
broker.addProducer(cs.getContext(), info);
|
||||
try {
|
||||
ss.addProducer(info);
|
||||
} catch (IllegalStateException e) {
|
||||
broker.removeProducer(cs.getContext(), info);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -462,12 +473,15 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
if( ss == null )
|
||||
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
|
||||
|
||||
broker.addConsumer(cs.getContext(), info);
|
||||
try {
|
||||
ss.addConsumer(info);
|
||||
} catch (IllegalStateException e) {
|
||||
broker.removeConsumer(cs.getContext(), info);
|
||||
}
|
||||
// Avoid replaying dup commands
|
||||
if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
|
||||
broker.addConsumer(cs.getContext(), info);
|
||||
try {
|
||||
ss.addConsumer(info);
|
||||
} catch (IllegalStateException e) {
|
||||
broker.removeConsumer(cs.getContext(), info);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
@ -491,14 +505,17 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
|
||||
public Response processAddSession(SessionInfo info) throws Exception {
|
||||
ConnectionId connectionId = info.getSessionId().getParentId();
|
||||
|
||||
ConnectionState cs = lookupConnectionState(connectionId);
|
||||
broker.addSession(cs.getContext(), info);
|
||||
try {
|
||||
cs.addSession(info);
|
||||
} catch (IllegalStateException e) {
|
||||
broker.removeSession(cs.getContext(), info);
|
||||
}
|
||||
|
||||
// Avoid replaying dup commands
|
||||
if( !cs.getSessionIds().contains(info.getSessionId()) ) {
|
||||
broker.addSession(cs.getContext(), info);
|
||||
try {
|
||||
cs.addSession(info);
|
||||
} catch (IllegalStateException e) {
|
||||
broker.removeSession(cs.getContext(), info);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -540,28 +557,40 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}
|
||||
|
||||
public Response processAddConnection(ConnectionInfo info) throws Exception {
|
||||
// Setup the context.
|
||||
String clientId = info.getClientId();
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setConnection(this);
|
||||
context.setBroker(broker);
|
||||
context.setConnector(connector);
|
||||
context.setTransactions(new ConcurrentHashMap());
|
||||
context.setClientId(clientId);
|
||||
context.setUserName(info.getUserName());
|
||||
context.setConnectionId(info.getConnectionId());
|
||||
context.setWireFormatInfo(wireFormatInfo);
|
||||
this.manageable = info.isManageable();
|
||||
connectionStates.put(info.getConnectionId(), new ConnectionState(info, context));
|
||||
|
||||
|
||||
broker.addConnection(context, info);
|
||||
if (info.isManageable() && broker.isFaultTolerantConfiguration()){
|
||||
//send ConnectionCommand
|
||||
ConnectionControl command = new ConnectionControl();
|
||||
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
|
||||
dispatchAsync(command);
|
||||
}
|
||||
|
||||
ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
|
||||
if( state == null ) {
|
||||
|
||||
// Setup the context.
|
||||
String clientId = info.getClientId();
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setConnection(this);
|
||||
context.setBroker(broker);
|
||||
context.setConnector(connector);
|
||||
context.setTransactions(new ConcurrentHashMap());
|
||||
context.setClientId(clientId);
|
||||
context.setUserName(info.getUserName());
|
||||
context.setConnectionId(info.getConnectionId());
|
||||
context.setWireFormatInfo(wireFormatInfo);
|
||||
context.incrementReference();
|
||||
this.manageable = info.isManageable();
|
||||
|
||||
state = new ConnectionState(info, context);
|
||||
brokerConnectionStates.put(info.getConnectionId(), state);
|
||||
localConnectionStates.put(info.getConnectionId(), state);
|
||||
|
||||
broker.addConnection(context, info);
|
||||
if (info.isManageable() && broker.isFaultTolerantConfiguration()){
|
||||
//send ConnectionCommand
|
||||
ConnectionControl command = new ConnectionControl();
|
||||
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
|
||||
dispatchAsync(command);
|
||||
}
|
||||
} else {
|
||||
// We are a concurrent connection... it must be client reconnect.
|
||||
localConnectionStates.put(info.getConnectionId(), state);
|
||||
state.getContext().incrementReference();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -599,8 +628,14 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
||||
}catch(Throwable e){
|
||||
serviceLog.warn("Failed to remove connection " + cs.getInfo(),e);
|
||||
}
|
||||
connectionStates.remove(id);
|
||||
|
||||
ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
|
||||
if( state != null ) {
|
||||
// If we are the last reference, we should remove the state
|
||||
// from the broker.
|
||||
if( state.getContext().decrementReference() == 0 ){
|
||||
brokerConnectionStates.remove(id);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
@ -51,6 +52,7 @@ public class ConnectionContext {
|
||||
private Object longTermStoreContext;
|
||||
private boolean producerFlowControl=true;
|
||||
private MessageAuthorizationPolicy messageAuthorizationPolicy;
|
||||
private AtomicInteger referenceCounter = new AtomicInteger();
|
||||
|
||||
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
|
||||
|
||||
@ -221,4 +223,12 @@ public class ConnectionContext {
|
||||
return true;
|
||||
}
|
||||
|
||||
public int incrementReference() {
|
||||
return referenceCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
public int decrementReference() {
|
||||
return referenceCounter.decrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,8 +17,15 @@
|
||||
*/
|
||||
package org.apache.activemq.broker.region;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.InvalidClientIDException;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
@ -47,15 +54,9 @@ import org.apache.activemq.util.IdGenerator;
|
||||
import org.apache.activemq.util.LongSequenceGenerator;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
|
||||
import javax.jms.InvalidClientIDException;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
/**
|
||||
* Routes Broker operations to the correct messaging regions for processing.
|
||||
@ -85,6 +86,8 @@ public class RegionBroker implements Broker {
|
||||
private String brokerName;
|
||||
private Map clientIdSet = new HashMap(); // we will synchronize access
|
||||
protected PersistenceAdapter adaptor;
|
||||
|
||||
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
|
||||
|
||||
|
||||
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
|
||||
@ -514,7 +517,9 @@ public class RegionBroker implements Broker {
|
||||
this.keepDurableSubsActive = keepDurableSubsActive;
|
||||
}
|
||||
|
||||
|
||||
public Map getConnectionStates() {
|
||||
return connectionStates;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user