git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@440588 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-09-06 03:19:33 +00:00
parent 937de5bd11
commit 5b53083274
3 changed files with 112 additions and 63 deletions

View File

@ -23,9 +23,11 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@ -89,7 +91,8 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
private boolean inServiceException=false;
private boolean manageable;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
protected final Map brokerConnectionStates;
private WireFormatInfo wireFormatInfo;
protected boolean disposed=false;
@ -118,6 +121,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
this.connector = connector;
this.broker = broker;
RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
brokerConnectionStates = rb.getConnectionStates();
if (connector != null) {
this.statistics.setParent(connector.getStatistics());
}
@ -154,7 +161,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
// Remove all logical connection associated with this connection
// from the broker.
if(!broker.isStopped()){
ArrayList l=new ArrayList(connectionStates.keySet());
ArrayList l=new ArrayList(localConnectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId) iter.next();
try{
@ -246,25 +253,25 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
protected ConnectionState lookupConnectionState(ConsumerId id) {
ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
if( cs== null )
throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "+id.getParentId().getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(ProducerId id) {
ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
if( cs== null )
throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "+id.getParentId().getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(SessionId id) {
ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId());
if( cs== null )
throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "+id.getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
ConnectionState cs = (ConnectionState) localConnectionStates.get(connectionId);
if( cs== null )
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
return cs;
@ -294,7 +301,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processBeginTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@ -311,7 +318,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@ -322,7 +329,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@ -332,7 +339,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@ -342,7 +349,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@ -352,7 +359,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@ -362,7 +369,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@ -433,12 +440,16 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
SessionState ss = cs.getSessionState(sessionId);
if( ss == null )
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
broker.addProducer(cs.getContext(), info);
try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
// Avoid replaying dup commands
if( !ss.getProducerIds().contains(info.getProducerId()) ) {
broker.addProducer(cs.getContext(), info);
try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
}
return null;
}
@ -467,12 +478,15 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( ss == null )
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
// Avoid replaying dup commands
if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
}
return null;
}
@ -496,14 +510,17 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public Response processAddSession(SessionInfo info) throws Exception {
ConnectionId connectionId = info.getSessionId().getParentId();
ConnectionState cs = lookupConnectionState(connectionId);
broker.addSession(cs.getContext(), info);
try {
cs.addSession(info);
} catch (IllegalStateException e) {
broker.removeSession(cs.getContext(), info);
}
// Avoid replaying dup commands
if( !cs.getSessionIds().contains(info.getSessionId()) ) {
broker.addSession(cs.getContext(), info);
try {
cs.addSession(info);
} catch (IllegalStateException e) {
broker.removeSession(cs.getContext(), info);
}
}
return null;
}
@ -545,24 +562,34 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
// Setup the context.
ConnectionContext context = new ConnectionContext(info);
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
context.setTransactions(new ConcurrentHashMap());
context.setWireFormatInfo(wireFormatInfo);
this.manageable = info.isManageable();
connectionStates.put(info.getConnectionId(), new ConnectionState(info, context));
broker.addConnection(context, info);
if (info.isManageable() && broker.isFaultTolerantConfiguration()){
//send ConnectionCommand
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
if( state == null ) {
// Setup the context.
ConnectionContext context = new ConnectionContext(info);
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
context.setTransactions(new ConcurrentHashMap());
context.setWireFormatInfo(wireFormatInfo);
this.manageable = info.isManageable();
context.incrementReference();
state = new ConnectionState(info, context);
brokerConnectionStates.put(info.getConnectionId(), state);
localConnectionStates.put(info.getConnectionId(), state);
broker.addConnection(context, info);
if (info.isManageable() && broker.isFaultTolerantConfiguration()){
//send ConnectionCommand
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
} else {
// We are a concurrent connection... it must be client reconnect.
localConnectionStates.put(info.getConnectionId(), state);
state.getContext().incrementReference();
}
return null;
}
@ -600,8 +627,14 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}catch(Throwable e){
serviceLog.warn("Failed to remove connection " + cs.getInfo(),e);
}
connectionStates.remove(id);
ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
if( state != null ) {
// If we are the last reference, we should remove the state
// from the broker.
if( state.getContext().decrementReference() == 0 ){
brokerConnectionStates.remove(id);
}
}
return null;
}

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ConnectionId;
@ -52,6 +53,7 @@ public class ConnectionContext {
private Object longTermStoreContext;
private boolean producerFlowControl=true;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
@ -231,4 +233,12 @@ public class ConnectionContext {
return true;
}
public int incrementReference() {
return referenceCounter.incrementAndGet();
}
public int decrementReference() {
return referenceCounter.decrementAndGet();
}
}

View File

@ -17,8 +17,15 @@
*/
package org.apache.activemq.broker.region;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
@ -51,15 +58,9 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
/**
* Routes Broker operations to the correct messaging regions for processing.
@ -91,6 +92,7 @@ public class RegionBroker implements Broker {
private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
@ -575,6 +577,10 @@ public class RegionBroker implements Broker {
this.adminConnectionContext = adminConnectionContext;
}
public Map getConnectionStates() {
return connectionStates;
}
public Store getTempDataStore() {
return brokerService.getTempDataStore();
}