This closes #111 openwire refactoring

This commit is contained in:
Clebert Suconic 2015-08-06 10:17:10 -04:00
commit d4de65041b
12 changed files with 382 additions and 778 deletions

View File

@ -40,6 +40,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -77,14 +80,10 @@ import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMapTransportConnectionStateRegister;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleTransportConnectionStateRegister;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionStateRegister;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -129,8 +128,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
private OpenWireFormat wireFormat; private OpenWireFormat wireFormat;
private AMQTransportConnectionStateRegister connectionStateRegister = new AMQSingleTransportConnectionStateRegister();
private boolean faultTolerantConnection; private boolean faultTolerantConnection;
private AMQConnectionContext context; private AMQConnectionContext context;
@ -175,12 +172,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, AMQConsumerBrokerExchange>(); private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, AMQConsumerBrokerExchange>();
private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, AMQProducerBrokerExchange>(); private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, AMQProducerBrokerExchange>();
private AMQTransportConnectionState state; private ConnectionState state;
private final Set<String> tempQueues = new ConcurrentHashSet<String>(); private final Set<String> tempQueues = new ConcurrentHashSet<String>();
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
private DataInputWrapper dataInput = new DataInputWrapper(); private DataInputWrapper dataInput = new DataInputWrapper();
private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>(); private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
@ -194,7 +189,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
this.transportConnection = connection; this.transportConnection = connection;
this.acceptorUsed = new AMQConnectorImpl(acceptorUsed); this.acceptorUsed = new AMQConnectorImpl(acceptorUsed);
this.wireFormat = wf; this.wireFormat = wf;
brokerConnectionStates = protocolManager.getConnectionStates();
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
} }
@ -299,7 +293,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
context.setDontSendReponse(false); context.setDontSendReponse(false);
response = null; response = null;
} }
context = null;
} }
if (response != null && !protocolManager.isStopping()) if (response != null && !protocolManager.isStopping())
@ -621,38 +614,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
info.setClientMaster(true); info.setClientMaster(true);
} }
// Make sure 2 concurrent connections by the same ID only generate 1 state = new ConnectionState(info);
// TransportConnectionState object.
synchronized (brokerConnectionStates) context = new AMQConnectionContext();
{
state = (AMQTransportConnectionState) brokerConnectionStates.get(info state.reset(info);
.getConnectionId());
if (state == null)
{
state = new AMQTransportConnectionState(info, this);
brokerConnectionStates.put(info.getConnectionId(), state);
}
state.incrementReference();
}
// If there are 2 concurrent connections for the same connection id,
// then last one in wins, we need to sync here
// to figure out the winner.
synchronized (state.getConnectionMutex())
{
if (state.getConnection() != this)
{
state.getConnection().disconnect(true);
state.setConnection(this);
state.reset(info);
}
}
registerConnectionState(info.getConnectionId(), state);
this.faultTolerantConnection = info.isFaultTolerant(); this.faultTolerantConnection = info.isFaultTolerant();
// Setup the context. // Setup the context.
String clientId = info.getClientId(); String clientId = info.getClientId();
context = new AMQConnectionContext();
context.setBroker(protocolManager); context.setBroker(protocolManager);
context.setClientId(clientId); context.setClientId(clientId);
context.setClientMaster(info.isClientMaster()); context.setClientMaster(info.isClientMaster());
@ -671,8 +642,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
context.setReconnect(info.isFailoverReconnect()); context.setReconnect(info.isFailoverReconnect());
this.manageable = info.isManageable(); this.manageable = info.isManageable();
context.setConnectionState(state); context.setConnectionState(state);
state.setContext(context);
state.setConnection(this);
if (info.getClientIp() == null) if (info.getClientIp() == null)
{ {
info.setClientIp(getRemoteAddress()); info.setClientIp(getRemoteAddress());
@ -684,12 +653,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
} }
catch (Exception e) catch (Exception e)
{ {
synchronized (brokerConnectionStates)
{
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
if (e instanceof SecurityException) if (e instanceof SecurityException)
{ {
// close this down - in case the peer of this transport doesn't play // close this down - in case the peer of this transport doesn't play
@ -926,28 +889,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
return this.messageAuthorizationPolicy; return this.messageAuthorizationPolicy;
} }
protected synchronized AMQTransportConnectionState unregisterConnectionState(
ConnectionId connectionId)
{
return connectionStateRegister.unregisterConnectionState(connectionId);
}
protected synchronized AMQTransportConnectionState registerConnectionState(
ConnectionId connectionId, AMQTransportConnectionState state)
{
AMQTransportConnectionState cs = null;
if (!connectionStateRegister.isEmpty()
&& !connectionStateRegister.doesHandleMultipleConnectionStates())
{
// swap implementations
AMQTransportConnectionStateRegister newRegister = new AMQMapTransportConnectionStateRegister();
newRegister.intialize(connectionStateRegister);
connectionStateRegister = newRegister;
}
cs = connectionStateRegister.registerConnectionState(connectionId, state);
return cs;
}
public void delayedStop(final int waitTime, final String reason, public void delayedStop(final int waitTime, final String reason,
Throwable cause) Throwable cause)
{ {
@ -997,16 +938,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
} }
if (stopping.compareAndSet(false, true)) if (stopping.compareAndSet(false, true))
{ {
// Let all the connection contexts know we are shutting down if (context != null)
// so that in progress operations can notice and unblock.
List<AMQTransportConnectionState> connectionStates = listConnectionStates();
for (AMQTransportConnectionState cs : connectionStates)
{ {
AMQConnectionContext connectionContext = cs.getContext(); context.getStopping().set(true);
if (connectionContext != null)
{
connectionContext.getStopping().set(true);
}
} }
try try
{ {
@ -1040,11 +974,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
} }
} }
protected synchronized List<AMQTransportConnectionState> listConnectionStates()
{
return connectionStateRegister.listConnectionStates();
}
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
this.acceptorUsed.onStopped(this); this.acceptorUsed.onStopped(this);
@ -1095,19 +1024,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
// from the broker. // from the broker.
if (!protocolManager.isStopped()) if (!protocolManager.isStopped())
{ {
List<AMQTransportConnectionState> connectionStates = listConnectionStates(); context.getStopping().set(true);
connectionStates = listConnectionStates(); try
for (AMQTransportConnectionState cs : connectionStates)
{ {
cs.getContext().getStopping().set(true); processRemoveConnection(state.getInfo().getConnectionId(), 0L);
try }
{ catch (Throwable ignore)
processRemoveConnection(cs.getInfo().getConnectionId(), 0L); {
} ignore.printStackTrace();
catch (Throwable ignore)
{
ignore.printStackTrace();
}
} }
} }
} }
@ -1134,16 +1058,21 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
return resp; return resp;
} }
AMQConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) public void addConsumerBrokerExchange(ConsumerId id, AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap)
{ {
AMQConsumerBrokerExchange result = consumerExchanges.get(id); AMQConsumerBrokerExchange result = consumerExchanges.get(id);
if (result == null) if (result == null)
{ {
if (consumerMap.size() == 1)
{
result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next());
}
else
{
result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap);
}
synchronized (consumerExchanges) synchronized (consumerExchanges)
{ {
result = new AMQConsumerBrokerExchange();
AMQTransportConnectionState state = lookupConnectionState(id);
context = state.getContext();
result.setConnectionContext(context); result.setConnectionContext(context);
SessionState ss = state.getSessionState(id.getParentId()); SessionState ss = state.getSessionState(id.getParentId());
if (ss != null) if (ss != null)
@ -1165,63 +1094,36 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
consumerExchanges.put(id, result); consumerExchanges.put(id, result);
} }
} }
return result;
} }
protected synchronized AMQTransportConnectionState lookupConnectionState( public int getConsumerCount()
ConsumerId id)
{
return connectionStateRegister.lookupConnectionState(id);
}
protected synchronized AMQTransportConnectionState lookupConnectionState(
ProducerId id)
{
return connectionStateRegister.lookupConnectionState(id);
}
public int getConsumerCount(ConnectionId connectionId)
{ {
int result = 0; int result = 0;
AMQTransportConnectionState cs = lookupConnectionState(connectionId); for (SessionId sessionId : state.getSessionIds())
if (cs != null)
{ {
for (SessionId sessionId : cs.getSessionIds()) SessionState sessionState = state.getSessionState(sessionId);
if (sessionState != null)
{ {
SessionState sessionState = cs.getSessionState(sessionId); result += sessionState.getConsumerIds().size();
if (sessionState != null)
{
result += sessionState.getConsumerIds().size();
}
} }
} }
return result; return result;
} }
public int getProducerCount(ConnectionId connectionId) public int getProducerCount()
{ {
int result = 0; int result = 0;
AMQTransportConnectionState cs = lookupConnectionState(connectionId); for (SessionId sessionId : state.getSessionIds())
if (cs != null)
{ {
for (SessionId sessionId : cs.getSessionIds()) SessionState sessionState = state.getSessionState(sessionId);
if (sessionState != null)
{ {
SessionState sessionState = cs.getSessionState(sessionId); result += sessionState.getProducerIds().size();
if (sessionState != null)
{
result += sessionState.getProducerIds().size();
}
} }
} }
return result; return result;
} }
public synchronized AMQTransportConnectionState lookupConnectionState(
ConnectionId connectionId)
{
return connectionStateRegister.lookupConnectionState(connectionId);
}
@Override @Override
public Response processAddDestination(DestinationInfo dest) throws Exception public Response processAddDestination(DestinationInfo dest) throws Exception
{ {
@ -1273,20 +1175,18 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
@Override @Override
public Response processAddSession(SessionInfo info) throws Exception public Response processAddSession(SessionInfo info) throws Exception
{ {
ConnectionId connectionId = info.getSessionId().getParentId();
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
// Avoid replaying dup commands // Avoid replaying dup commands
if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) if (!state.getSessionIds().contains(info.getSessionId()))
{ {
protocolManager.addSession(this, info); protocolManager.addSession(this, info);
try try
{ {
cs.addSession(info); state.addSession(info);
} }
catch (IllegalStateException e) catch (IllegalStateException e)
{ {
e.printStackTrace(); e.printStackTrace();
protocolManager.removeSession(cs.getContext(), info); protocolManager.removeSession(context, info);
} }
} }
return null; return null;
@ -1422,10 +1322,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
//in that case don't send the response //in that case don't send the response
//this will force the client to wait until //this will force the client to wait until
//the response is got. //the response is got.
if (context == null)
{
this.context = new AMQConnectionContext();
}
context.setDontSendReponse(true); context.setDontSendReponse(true);
} }
else else
@ -1463,9 +1359,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
synchronized (producerExchanges) synchronized (producerExchanges)
{ {
result = new AMQProducerBrokerExchange(); result = new AMQProducerBrokerExchange();
AMQTransportConnectionState state = lookupConnectionState(id);
context = state.getContext();
result.setConnectionContext(context); result.setConnectionContext(context);
//todo implement reconnect https://issues.apache.org/jira/browse/ARTEMIS-194
if (context.isReconnect() if (context.isReconnect()
|| (context.isNetworkConnection() && this.acceptorUsed || (context.isNetworkConnection() && this.acceptorUsed
.isAuditNetworkProducers())) .isAuditNetworkProducers()))
@ -1490,20 +1385,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
producerExchanges.put(id, result); producerExchanges.put(id, result);
} }
} }
else
{
context = result.getConnectionContext();
}
return result; return result;
} }
@Override @Override
public Response processMessageAck(MessageAck ack) throws Exception public Response processMessageAck(MessageAck ack) throws Exception
{ {
ConsumerId consumerId = ack.getConsumerId(); AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
SessionId sessionId = consumerId.getParentId(); consumerBrokerExchange.acknowledge(ack);
AMQSession session = protocolManager.getSession(sessionId);
session.acknowledge(ack);
return null; return null;
} }
@ -1523,7 +1412,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
@Override @Override
public Response processMessagePull(MessagePull arg0) throws Exception public Response processMessagePull(MessagePull arg0) throws Exception
{ {
throw new IllegalStateException("not implemented! "); AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
if (amqConsumerBrokerExchange == null)
{
throw new IllegalStateException("Consumer does not exist");
}
amqConsumerBrokerExchange.processMessagePull(arg0);
return null;
} }
@Override @Override
@ -1542,8 +1437,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
@Override @Override
public Response processRecoverTransactions(TransactionInfo info) throws Exception public Response processRecoverTransactions(TransactionInfo info) throws Exception
{ {
AMQTransportConnectionState cs = lookupConnectionState(info.getConnectionId()); Set<SessionId> sIds = state.getSessionIds();
Set<SessionId> sIds = cs.getSessionIds();
TransactionId[] recovered = protocolManager.recoverTransactions(sIds); TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
return new DataArrayResponse(recovered); return new DataArrayResponse(recovered);
} }
@ -1552,48 +1446,31 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
public Response processRemoveConnection(ConnectionId id, public Response processRemoveConnection(ConnectionId id,
long lastDeliveredSequenceId) throws Exception long lastDeliveredSequenceId) throws Exception
{ {
AMQTransportConnectionState cs = lookupConnectionState(id); // Don't allow things to be added to the connection state while we
if (cs != null) // are shutting down.
state.shutdown();
// Cascade the connection stop to the sessions.
for (SessionId sessionId : state.getSessionIds())
{ {
// Don't allow things to be added to the connection state while we
// are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
for (SessionId sessionId : cs.getSessionIds())
{
try
{
processRemoveSession(sessionId, lastDeliveredSequenceId);
}
catch (Throwable e)
{
// LOG
}
}
try try
{ {
protocolManager.removeConnection(cs.getContext(), cs.getInfo(), processRemoveSession(sessionId, lastDeliveredSequenceId);
null);
} }
catch (Throwable e) catch (Throwable e)
{ {
// log // LOG
}
AMQTransportConnectionState state = unregisterConnectionState(id);
if (state != null)
{
synchronized (brokerConnectionStates)
{
// If we are the last reference, we should remove the state
// from the broker.
if (state.decrementReference() == 0)
{
brokerConnectionStates.remove(id);
}
}
} }
} }
try
{
protocolManager.removeConnection(context, state.getInfo(),
null);
}
catch (Throwable e)
{
// log
}
return null; return null;
} }
@ -1602,15 +1479,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
long lastDeliveredSequenceId) throws Exception long lastDeliveredSequenceId) throws Exception
{ {
SessionId sessionId = id.getParentId(); SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId(); SessionState ss = state.getSessionState(sessionId);
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null)
{
throw new IllegalStateException(
"Cannot remove a consumer from a connection that had not been registered: "
+ connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) if (ss == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
@ -1625,8 +1494,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
} }
ConsumerInfo info = consumerState.getInfo(); ConsumerInfo info = consumerState.getInfo();
info.setLastDeliveredSequenceId(lastDeliveredSequenceId); info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
protocolManager.removeConsumer(cs.getContext(), consumerState.getInfo());
AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
consumerBrokerExchange.removeConsumer();
removeConsumerBrokerExchange(id); removeConsumerBrokerExchange(id);
return null; return null;
} }
@ -1661,15 +1535,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
public Response processRemoveSession(SessionId id, public Response processRemoveSession(SessionId id,
long lastDeliveredSequenceId) throws Exception long lastDeliveredSequenceId) throws Exception
{ {
ConnectionId connectionId = id.getParentId(); SessionState session = state.getSessionState(id);
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null)
{
throw new IllegalStateException(
"Cannot remove session from connection that had not been registered: "
+ connectionId);
}
SessionState session = cs.getSessionState(id);
if (session == null) if (session == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
@ -1701,8 +1567,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
// LOG.warn("Failed to remove producer: {}", producerId, e); // LOG.warn("Failed to remove producer: {}", producerId, e);
} }
} }
cs.removeSession(id); state.removeSession(id);
protocolManager.removeSession(cs.getContext(), session.getInfo()); protocolManager.removeSession(context, session.getInfo());
return null; return null;
} }
@ -1790,7 +1656,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
public AMQConnectionContext getConext() public AMQConnectionContext getConext()
{ {
return this.state.getContext(); return this.context;
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
@ -55,6 +56,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -69,7 +71,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -114,8 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
protected final ProducerId advisoryProducerId = new ProducerId(); protected final ProducerId advisoryProducerId = new ProducerId();
// from broker // from broker
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = Collections protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections
.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); .synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>());
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>(); private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>();
@ -131,6 +132,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>(); private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
private final ScheduledExecutorService scheduledPool;
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
{ {
this.factory = factory; this.factory = factory;
@ -141,6 +144,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
brokerState = new BrokerState(); brokerState = new BrokerState();
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
ManagementService service = server.getManagementService(); ManagementService service = server.getManagementService();
scheduledPool = server.getScheduledPool();
if (service != null) if (service != null)
{ {
service.addNotificationListener(this); service.addNotificationListener(this);
@ -239,7 +243,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
} }
public void handleCommand(OpenWireConnection openWireConnection, public void handleCommand(OpenWireConnection openWireConnection,
Object command) Object command) throws Exception
{ {
Command amqCmd = (Command) command; Command amqCmd = (Command) command;
byte type = amqCmd.getDataStructureType(); byte type = amqCmd.getDataStructureType();
@ -252,6 +256,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
* failover and load balancing. These features are not yet implemented for Artemis OpenWire. Instead we * failover and load balancing. These features are not yet implemented for Artemis OpenWire. Instead we
* simply drop the packet. See: ACTIVEMQ6-108 */ * simply drop the packet. See: ACTIVEMQ6-108 */
break; break;
case CommandTypes.MESSAGE_PULL:
MessagePull messagePull = (MessagePull) amqCmd;
openWireConnection.processMessagePull(messagePull);
break;
case CommandTypes.CONSUMER_CONTROL: case CommandTypes.CONSUMER_CONTROL:
break; break;
default: default:
@ -306,11 +314,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
} }
} }
public Map<ConnectionId, ConnectionState> getConnectionStates()
{
return this.brokerConnectionStates;
}
public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception
{ {
String username = info.getUserName(); String username = info.getUserName();
@ -483,8 +486,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
{ {
SessionId sessionId = info.getProducerId().getParentId(); SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
AMQTransportConnectionState cs = theConn ConnectionState cs = theConn.getState();
.lookupConnectionState(connectionId);
if (cs == null) if (cs == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
@ -505,7 +507,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (destination != null if (destination != null
&& !AdvisorySupport.isAdvisoryTopic(destination)) && !AdvisorySupport.isAdvisoryTopic(destination))
{ {
if (theConn.getProducerCount(connectionId) >= theConn if (theConn.getProducerCount() >= theConn
.getMaximumProducersAllowedPerConnection()) .getMaximumProducersAllowedPerConnection())
{ {
throw new IllegalStateException( throw new IllegalStateException(
@ -541,8 +543,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
// Todo: add a destination interceptors holder here (amq supports this) // Todo: add a destination interceptors holder here (amq supports this)
SessionId sessionId = info.getConsumerId().getParentId(); SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
AMQTransportConnectionState cs = theConn ConnectionState cs = theConn.getState();
.lookupConnectionState(connectionId);
if (cs == null) if (cs == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
@ -564,7 +565,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (destination != null if (destination != null
&& !AdvisorySupport.isAdvisoryTopic(destination)) && !AdvisorySupport.isAdvisoryTopic(destination))
{ {
if (theConn.getConsumerCount(connectionId) >= theConn if (theConn.getConsumerCount() >= theConn
.getMaximumConsumersAllowedPerConnection()) .getMaximumConsumersAllowedPerConnection())
{ {
throw new IllegalStateException( throw new IllegalStateException(
@ -580,17 +581,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
throw new IllegalStateException("Session not exist! : " + sessionId); throw new IllegalStateException("Session not exist! : " + sessionId);
} }
amqSession.createConsumer(info); amqSession.createConsumer(info, amqSession);
try ss.addConsumer(info);
{
ss.addConsumer(info);
theConn.addConsumerBrokerExchange(info.getConsumerId());
}
catch (IllegalStateException e)
{
amqSession.removeConsumer(info);
}
} }
} }
@ -614,7 +607,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
boolean internal) boolean internal)
{ {
AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss,
server, theConn, this); server, theConn, scheduledPool, this);
amqSession.initialize(); amqSession.initialize();
amqSession.setInternal(internal); amqSession.setInternal(internal);
sessions.put(ss.getSessionId(), amqSession); sessions.put(ss.getSessionId(), amqSession);
@ -644,13 +637,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
} }
} }
public void removeConsumer(AMQConnectionContext context, ConsumerInfo info) throws Exception
{
SessionId sessionId = info.getConsumerId().getParentId();
AMQSession session = sessions.get(sessionId);
session.removeConsumer(info);
}
public void removeProducer(ProducerId id) public void removeProducer(ProducerId id)
{ {
SessionId sessionId = id.getParentId(); SessionId sessionId = id.getParentId();
@ -677,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
{ {
SimpleString qName = new SimpleString("jms.queue." SimpleString qName = new SimpleString("jms.queue."
+ dest.getPhysicalName()); + dest.getPhysicalName());
ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId()); ConnectionState state = connection.getState();
ConnectionInfo connInfo = state.getInfo(); ConnectionInfo connInfo = state.getInfo();
if (connInfo != null) if (connInfo != null)
{ {
@ -849,12 +835,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
{ {
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination); ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
ConnectionId connId = sessionId.getParentId(); ConnectionId connId = sessionId.getParentId();
AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId); OpenWireConnection cc = this.brokerConnectionStates.get(connId);
OpenWireConnection conn = cc.getConnection();
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString()); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId()); fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId());
} }
} }

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessagePull;
import java.util.Map;
public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange
{
private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap)
{
super(amqSession);
this.consumerMap = consumerMap;
}
public void processMessagePull(MessagePull messagePull) throws Exception
{
AMQConsumer amqConsumer = consumerMap.get(messagePull.getDestination());
if (amqConsumer != null)
{
amqConsumer.processMessagePull(messagePull);
}
}
public void acknowledge(MessageAck ack) throws Exception
{
AMQConsumer amqConsumer = consumerMap.get(ack.getDestination());
if (amqConsumer != null)
{
amqSession.acknowledge(ack, amqConsumer);
}
}
public void removeConsumer() throws Exception
{
for (AMQConsumer amqConsumer : consumerMap.values())
{
amqConsumer.removeConsumer();
}
}
}

View File

@ -21,13 +21,19 @@ import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -42,20 +48,28 @@ public class AMQConsumer implements BrowserListener
private AMQSession session; private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination actualDest; private org.apache.activemq.command.ActiveMQDestination actualDest;
private ConsumerInfo info; private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private long nativeId = -1; private long nativeId = -1;
private SimpleString subQueueName = null; private SimpleString subQueueName = null;
private final int prefetchSize; private final int prefetchSize;
private AtomicInteger currentSize; private AtomicInteger windowAvailable;
private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<MessageInfo>(); private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<MessageInfo>();
private long messagePullSequence = 0;
private MessagePullHandler messagePullHandler;
public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info) public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info, ScheduledExecutorService scheduledPool)
{ {
this.session = amqSession; this.session = amqSession;
this.actualDest = d; this.actualDest = d;
this.info = info; this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize(); this.prefetchSize = info.getPrefetchSize();
this.currentSize = new AtomicInteger(0); this.windowAvailable = new AtomicInteger(prefetchSize);
if (prefetchSize == 0)
{
messagePullHandler = new MessagePullHandler();
}
} }
public void init() throws Exception public void init() throws Exception
@ -130,12 +144,12 @@ public class AMQConsumer implements BrowserListener
coreSession.createQueue(address, subQueueName, selector, true, false); coreSession.createQueue(address, subQueueName, selector, true, false);
} }
coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, Integer.MAX_VALUE); coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
} }
else else
{ {
SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName()); SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, Integer.MAX_VALUE); coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
} }
if (info.isBrowser()) if (info.isBrowser())
@ -163,23 +177,14 @@ public class AMQConsumer implements BrowserListener
public void acquireCredit(int n) throws Exception public void acquireCredit(int n) throws Exception
{ {
this.currentSize.addAndGet(-n); boolean promptDelivery = windowAvailable.get() == 0;
if (currentSize.get() < prefetchSize) if (windowAvailable.get() < prefetchSize)
{ {
AtomicInteger credits = session.getCoreSession().getConsumerCredits(nativeId); this.windowAvailable.addAndGet(n);
credits.set(0);
session.getCoreSession().receiveConsumerCredits(nativeId, Integer.MAX_VALUE);
} }
} if (promptDelivery)
public void checkCreditOnDelivery() throws Exception
{
this.currentSize.incrementAndGet();
if (currentSize.get() == prefetchSize)
{ {
//stop because reach prefetchSize session.getCoreSession().promptDelivery(nativeId);
session.getCoreSession().receiveConsumerCredits(nativeId, 0);
} }
} }
@ -188,12 +193,16 @@ public class AMQConsumer implements BrowserListener
MessageDispatch dispatch; MessageDispatch dispatch;
try try
{ {
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message))
{
return 0;
}
//decrement deliveryCount as AMQ client tends to add 1. //decrement deliveryCount as AMQ client tends to add 1.
dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this);
int size = dispatch.getMessage().getSize(); int size = dispatch.getMessage().getSize();
this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size)); this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size));
session.deliverMessage(dispatch); session.deliverMessage(dispatch);
checkCreditOnDelivery(); windowAvailable.decrementAndGet();
return size; return size;
} }
catch (IOException e) catch (IOException e)
@ -206,6 +215,16 @@ public class AMQConsumer implements BrowserListener
} }
} }
public void handleDeliverNullDispatch()
{
MessageDispatch md = new MessageDispatch();
md.setConsumerId(getId());
md.setDestination(actualDest);
session.deliverMessage(md);
windowAvailable.decrementAndGet();
}
public void acknowledge(MessageAck ack) throws Exception public void acknowledge(MessageAck ack) throws Exception
{ {
MessageId first = ack.getFirstMessageId(); MessageId first = ack.getFirstMessageId();
@ -400,4 +419,90 @@ public class AMQConsumer implements BrowserListener
{ {
return info; return info;
} }
public boolean hasCredits()
{
return windowAvailable.get() > 0;
}
public void processMessagePull(MessagePull messagePull) throws Exception
{
windowAvailable.incrementAndGet();
if (messagePullHandler != null)
{
messagePullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout());
}
}
public void removeConsumer() throws Exception
{
session.removeConsumer(nativeId);
}
private class MessagePullHandler
{
private long next = -1;
private long timeout;
private CountDownLatch latch = new CountDownLatch(1);
private ScheduledFuture<?> messagePullFuture;
public void nextSequence(long next, long timeout) throws Exception
{
this.next = next;
this.timeout = timeout;
latch = new CountDownLatch(1);
session.getCoreSession().forceConsumerDelivery(nativeId, messagePullSequence);
//if we are 0 timeout or less we need to wait to get either the forced message or a real message.
if (timeout <= 0)
{
latch.await(10, TimeUnit.SECONDS);
//this means we have received no message just the forced delivery message
if (this.next >= 0)
{
handleDeliverNullDispatch();
}
}
}
public boolean checkForcedConsumer(ServerMessage message)
{
if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
{
System.out.println("MessagePullHandler.checkForcedConsumer");
if (next >= 0)
{
if (timeout <= 0)
{
latch.countDown();
}
else
{
messagePullFuture = scheduledPool.schedule(new Runnable()
{
@Override
public void run()
{
if (next >= 0)
{
handleDeliverNullDispatch();
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
}
return false;
}
else
{
next = -1;
if (messagePullFuture != null)
{
messagePullFuture.cancel(true);
}
latch.countDown();
return true;
}
}
}
} }

View File

@ -16,13 +16,22 @@
*/ */
package org.apache.activemq.artemis.core.protocol.openwire.amq; package org.apache.activemq.artemis.core.protocol.openwire.amq;
public class AMQConsumerBrokerExchange import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessagePull;
public abstract class AMQConsumerBrokerExchange
{ {
protected final AMQSession amqSession;
private AMQConnectionContext connectionContext; private AMQConnectionContext connectionContext;
private AMQDestination regionDestination; private AMQDestination regionDestination;
private AMQSubscription subscription; private AMQSubscription subscription;
private boolean wildcard; private boolean wildcard;
public AMQConsumerBrokerExchange(AMQSession amqSession)
{
this.amqSession = amqSession;
}
/** /**
* @return the connectionContext * @return the connectionContext
*/ */
@ -90,4 +99,10 @@ public class AMQConsumerBrokerExchange
{ {
this.wildcard = wildcard; this.wildcard = wildcard;
} }
public abstract void acknowledge(MessageAck ack) throws Exception;
public abstract void processMessagePull(MessagePull messagePull) throws Exception;
public abstract void removeConsumer() throws Exception;
} }

View File

@ -1,151 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SessionId;
public class AMQMapTransportConnectionStateRegister implements
AMQTransportConnectionStateRegister
{
private Map<ConnectionId, AMQTransportConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, AMQTransportConnectionState>();
public AMQTransportConnectionState registerConnectionState(
ConnectionId connectionId, AMQTransportConnectionState state)
{
AMQTransportConnectionState rc = connectionStates
.put(connectionId, state);
return rc;
}
public AMQTransportConnectionState unregisterConnectionState(
ConnectionId connectionId)
{
AMQTransportConnectionState rc = connectionStates.remove(connectionId);
if (rc.getReferenceCounter().get() > 1)
{
rc.decrementReference();
connectionStates.put(connectionId, rc);
}
return rc;
}
public List<AMQTransportConnectionState> listConnectionStates()
{
List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>();
rc.addAll(connectionStates.values());
return rc;
}
public AMQTransportConnectionState lookupConnectionState(String connectionId)
{
return connectionStates.get(new ConnectionId(connectionId));
}
public AMQTransportConnectionState lookupConnectionState(ConsumerId id)
{
AMQTransportConnectionState cs = lookupConnectionState(id
.getConnectionId());
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a consumer from a connection that had not been registered: "
+ id.getParentId().getParentId());
}
return cs;
}
public AMQTransportConnectionState lookupConnectionState(ProducerId id)
{
AMQTransportConnectionState cs = lookupConnectionState(id
.getConnectionId());
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a producer from a connection that had not been registered: "
+ id.getParentId().getParentId());
}
return cs;
}
public AMQTransportConnectionState lookupConnectionState(SessionId id)
{
AMQTransportConnectionState cs = lookupConnectionState(id
.getConnectionId());
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a session from a connection that had not been registered: "
+ id.getParentId());
}
return cs;
}
public AMQTransportConnectionState lookupConnectionState(
ConnectionId connectionId)
{
AMQTransportConnectionState cs = connectionStates.get(connectionId);
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a connection that had not been registered: "
+ connectionId);
}
return cs;
}
public boolean doesHandleMultipleConnectionStates()
{
return true;
}
public boolean isEmpty()
{
return connectionStates.isEmpty();
}
public void clear()
{
connectionStates.clear();
}
public void intialize(AMQTransportConnectionStateRegister other)
{
connectionStates.clear();
connectionStates.putAll(other.mapStates());
}
public Map<ConnectionId, AMQTransportConnectionState> mapStates()
{
HashMap<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>(
connectionStates);
return map;
}
}

View File

@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -69,12 +68,10 @@ public class AMQSession implements SessionCallback
private SessionInfo sessInfo; private SessionInfo sessInfo;
private ActiveMQServer server; private ActiveMQServer server;
private OpenWireConnection connection; private OpenWireConnection connection;
//native id -> consumer
private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<Long, AMQConsumer>();
//amq id -> native id
private Map<Long, Long> consumerIdMap = new HashMap<Long, Long>();
private Map<Long, AMQProducer> producers = new HashMap<Long, AMQProducer>(); private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
private Map<Long, AMQProducer> producers = new HashMap<>();
private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean started = new AtomicBoolean(false);
@ -82,15 +79,18 @@ public class AMQSession implements SessionCallback
private boolean isTx; private boolean isTx;
private final ScheduledExecutorService scheduledPool;
private OpenWireProtocolManager manager; private OpenWireProtocolManager manager;
public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo,
ActiveMQServer server, OpenWireConnection connection, OpenWireProtocolManager manager) ActiveMQServer server, OpenWireConnection connection, ScheduledExecutorService scheduledPool, OpenWireProtocolManager manager)
{ {
this.connInfo = connInfo; this.connInfo = connInfo;
this.sessInfo = sessInfo; this.sessInfo = sessInfo;
this.server = server; this.server = server;
this.connection = connection; this.connection = connection;
this.scheduledPool = scheduledPool;
this.manager = manager; this.manager = manager;
} }
@ -123,7 +123,7 @@ public class AMQSession implements SessionCallback
} }
public void createConsumer(ConsumerInfo info) throws Exception public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception
{ {
//check destination //check destination
ActiveMQDestination dest = info.getDestination(); ActiveMQDestination dest = info.getDestination();
@ -136,7 +136,7 @@ public class AMQSession implements SessionCallback
{ {
dests = new ActiveMQDestination[] {dest}; dests = new ActiveMQDestination[] {dest};
} }
Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
for (ActiveMQDestination d : dests) for (ActiveMQDestination d : dests)
{ {
if (d.isQueue()) if (d.isQueue())
@ -144,11 +144,13 @@ public class AMQSession implements SessionCallback
SimpleString queueName = OpenWireUtil.toCoreAddress(d); SimpleString queueName = OpenWireUtil.toCoreAddress(d);
getCoreServer().getJMSQueueCreator().create(queueName); getCoreServer().getJMSQueueCreator().create(queueName);
} }
AMQConsumer consumer = new AMQConsumer(this, d, info); AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
consumer.init(); consumer.init();
consumerMap.put(d, consumer);
consumers.put(consumer.getNativeId(), consumer); consumers.put(consumer.getNativeId(), consumer);
this.consumerIdMap.put(info.getConsumerId().getValue(), consumer.getNativeId());
} }
connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
coreSession.start(); coreSession.start();
started.set(true); started.set(true);
} }
@ -214,7 +216,8 @@ public class AMQSession implements SessionCallback
@Override @Override
public boolean hasCredits(ServerConsumer consumerID) public boolean hasCredits(ServerConsumer consumerID)
{ {
return true; AMQConsumer amqConsumer = consumers.get(consumerID.getID());
return amqConsumer.hasCredits();
} }
@Override @Override
@ -234,19 +237,12 @@ public class AMQSession implements SessionCallback
return this.server; return this.server;
} }
public void removeConsumer(ConsumerInfo info) throws Exception public void removeConsumer(long consumerId) throws Exception
{ {
long consumerId = info.getConsumerId().getValue(); boolean failed = !(this.txId != null || this.isTx);
long nativeId = this.consumerIdMap.remove(consumerId);
if (this.txId != null || this.isTx) coreSession.amqCloseConsumer(consumerId, failed);
{ consumers.remove(consumerId);
((AMQServerSession)coreSession).amqCloseConsumer(nativeId, false);
}
else
{
((AMQServerSession)coreSession).amqCloseConsumer(nativeId, true);
}
AMQConsumer consumer = consumers.remove(nativeId);
} }
public void createProducer(ProducerInfo info) throws Exception public void createProducer(ProducerInfo info) throws Exception
@ -331,16 +327,13 @@ public class AMQSession implements SessionCallback
return this.connection.getMarshaller(); return this.connection.getMarshaller();
} }
public void acknowledge(MessageAck ack) throws Exception public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception
{ {
TransactionId tid = ack.getTransactionId(); TransactionId tid = ack.getTransactionId();
if (tid != null) if (tid != null)
{ {
this.resetSessionTx(ack.getTransactionId()); this.resetSessionTx(ack.getTransactionId());
} }
ConsumerId consumerId = ack.getConsumerId();
long nativeConsumerId = consumerIdMap.get(consumerId.getValue());
AMQConsumer consumer = consumers.get(nativeConsumerId);
consumer.acknowledge(ack); consumer.acknowledge(ack);
if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE) if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE)

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessagePull;
public class AMQSingleConsumerBrokerExchange extends AMQConsumerBrokerExchange
{
private AMQConsumer consumer;
public AMQSingleConsumerBrokerExchange(AMQSession amqSession, AMQConsumer consumer)
{
super(amqSession);
this.consumer = consumer;
}
public void processMessagePull(MessagePull messagePull) throws Exception
{
consumer.processMessagePull(messagePull);
}
public void removeConsumer() throws Exception
{
consumer.removeConsumer();
}
public void acknowledge(MessageAck ack) throws Exception
{
amqSession.acknowledge(ack, consumer);
}
}

View File

@ -1,184 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SessionId;
/**
* We just copy this structure from amq, but what's the purpose
* and can it be removed ?
*/
public class AMQSingleTransportConnectionStateRegister implements
AMQTransportConnectionStateRegister
{
private AMQTransportConnectionState connectionState;
private ConnectionId connectionId;
public AMQTransportConnectionState registerConnectionState(
ConnectionId connectionId, AMQTransportConnectionState state)
{
AMQTransportConnectionState rc = connectionState;
connectionState = state;
this.connectionId = connectionId;
return rc;
}
public synchronized AMQTransportConnectionState unregisterConnectionState(
ConnectionId connectionId)
{
AMQTransportConnectionState rc = null;
if (connectionId != null && connectionState != null
&& this.connectionId != null)
{
if (this.connectionId.equals(connectionId))
{
rc = connectionState;
connectionState = null;
connectionId = null;
}
}
return rc;
}
public synchronized List<AMQTransportConnectionState> listConnectionStates()
{
List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>();
if (connectionState != null)
{
rc.add(connectionState);
}
return rc;
}
public synchronized AMQTransportConnectionState lookupConnectionState(
String connectionId)
{
AMQTransportConnectionState cs = connectionState;
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a connectionId for a connection that had not been registered: "
+ connectionId);
}
return cs;
}
public synchronized AMQTransportConnectionState lookupConnectionState(
ConsumerId id)
{
AMQTransportConnectionState cs = connectionState;
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a consumer from a connection that had not been registered: "
+ id.getParentId().getParentId());
}
return cs;
}
public synchronized AMQTransportConnectionState lookupConnectionState(
ProducerId id)
{
AMQTransportConnectionState cs = connectionState;
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a producer from a connection that had not been registered: "
+ id.getParentId().getParentId());
}
return cs;
}
public synchronized AMQTransportConnectionState lookupConnectionState(
SessionId id)
{
AMQTransportConnectionState cs = connectionState;
if (cs == null)
{
throw new IllegalStateException(
"Cannot lookup a session from a connection that had not been registered: "
+ id.getParentId());
}
return cs;
}
public synchronized AMQTransportConnectionState lookupConnectionState(
ConnectionId connectionId)
{
AMQTransportConnectionState cs = connectionState;
return cs;
}
public synchronized boolean doesHandleMultipleConnectionStates()
{
return false;
}
public synchronized boolean isEmpty()
{
return connectionState == null;
}
public void intialize(AMQTransportConnectionStateRegister other)
{
if (other.isEmpty())
{
clear();
}
else
{
Map map = other.mapStates();
Iterator i = map.entrySet().iterator();
Map.Entry<ConnectionId, AMQTransportConnectionState> entry = (Entry<ConnectionId, AMQTransportConnectionState>) i
.next();
connectionId = entry.getKey();
connectionState = entry.getValue();
}
}
public Map<ConnectionId, AMQTransportConnectionState> mapStates()
{
Map<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>();
if (!isEmpty())
{
map.put(connectionId, connectionState);
}
return map;
}
public void clear()
{
connectionState = null;
connectionId = null;
}
}

View File

@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
public class AMQTransportConnectionState extends ConnectionState
{
private AMQConnectionContext context;
private OpenWireConnection connection;
private AtomicInteger referenceCounter = new AtomicInteger();
private final Object connectionMutex = new Object();
public AMQTransportConnectionState(ConnectionInfo info,
OpenWireConnection transportConnection)
{
super(info);
connection = transportConnection;
}
public AMQConnectionContext getContext()
{
return context;
}
public OpenWireConnection getConnection()
{
return connection;
}
public void setContext(AMQConnectionContext context)
{
this.context = context;
}
public void setConnection(OpenWireConnection connection)
{
this.connection = connection;
}
public int incrementReference()
{
return referenceCounter.incrementAndGet();
}
public int decrementReference()
{
return referenceCounter.decrementAndGet();
}
public AtomicInteger getReferenceCounter()
{
return referenceCounter;
}
public void setReferenceCounter(AtomicInteger referenceCounter)
{
this.referenceCounter = referenceCounter;
}
public Object getConnectionMutex()
{
return connectionMutex;
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.List;
import java.util.Map;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SessionId;
/**
* What's the purpose of this?
*/
public interface AMQTransportConnectionStateRegister
{
AMQTransportConnectionState registerConnectionState(ConnectionId connectionId,
AMQTransportConnectionState state);
AMQTransportConnectionState unregisterConnectionState(ConnectionId connectionId);
List<AMQTransportConnectionState> listConnectionStates();
Map<ConnectionId, AMQTransportConnectionState> mapStates();
AMQTransportConnectionState lookupConnectionState(String connectionId);
AMQTransportConnectionState lookupConnectionState(ConsumerId id);
AMQTransportConnectionState lookupConnectionState(ProducerId id);
AMQTransportConnectionState lookupConnectionState(SessionId id);
AMQTransportConnectionState lookupConnectionState(ConnectionId connectionId);
boolean isEmpty();
boolean doesHandleMultipleConnectionStates();
void intialize(AMQTransportConnectionStateRegister other);
void clear();
}

View File

@ -788,6 +788,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
} }
} }
public void promptDelivery(long consumerID)
{
ServerConsumer consumer = consumers.get(consumerID);
// this would be possible if the server consumer was closed by pings/pongs.. etc
if (consumer != null)
{
consumer.promptDelivery();
}
}
public void acknowledge(final long consumerID, final long messageID) throws Exception public void acknowledge(final long consumerID, final long messageID) throws Exception
{ {
ServerConsumer consumer = consumers.get(consumerID); ServerConsumer consumer = consumers.get(consumerID);