ARTEMIS-186, ARTEMIS-188 and some refactoring
refactoring and simplifying some of the connection state code and removing stuff we dont need. Also removed some of maps used and removed the need for lots of lookups https://issues.apache.org/jira/browse/ARTEMIS-186 Added message pull support for zero prefetch consumers https://issues.apache.org/jira/browse/ARTEMIS-188 Added consumer flow control
This commit is contained in:
parent
5c7720dba3
commit
81756739bb
|
@ -40,6 +40,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
|
@ -77,14 +80,10 @@ import org.apache.activemq.command.WireFormatInfo;
|
|||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMapTransportConnectionStateRegister;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleTransportConnectionStateRegister;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionStateRegister;
|
||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
|
@ -129,8 +128,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
|
||||
private OpenWireFormat wireFormat;
|
||||
|
||||
private AMQTransportConnectionStateRegister connectionStateRegister = new AMQSingleTransportConnectionStateRegister();
|
||||
|
||||
private boolean faultTolerantConnection;
|
||||
|
||||
private AMQConnectionContext context;
|
||||
|
@ -175,12 +172,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, AMQConsumerBrokerExchange>();
|
||||
private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, AMQProducerBrokerExchange>();
|
||||
|
||||
private AMQTransportConnectionState state;
|
||||
private ConnectionState state;
|
||||
|
||||
private final Set<String> tempQueues = new ConcurrentHashSet<String>();
|
||||
|
||||
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
|
||||
|
||||
private DataInputWrapper dataInput = new DataInputWrapper();
|
||||
|
||||
private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
|
||||
|
@ -194,7 +189,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
this.transportConnection = connection;
|
||||
this.acceptorUsed = new AMQConnectorImpl(acceptorUsed);
|
||||
this.wireFormat = wf;
|
||||
brokerConnectionStates = protocolManager.getConnectionStates();
|
||||
this.creationTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
@ -299,7 +293,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
context.setDontSendReponse(false);
|
||||
response = null;
|
||||
}
|
||||
context = null;
|
||||
}
|
||||
|
||||
if (response != null && !protocolManager.isStopping())
|
||||
|
@ -621,38 +614,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
info.setClientMaster(true);
|
||||
}
|
||||
|
||||
// Make sure 2 concurrent connections by the same ID only generate 1
|
||||
// TransportConnectionState object.
|
||||
synchronized (brokerConnectionStates)
|
||||
{
|
||||
state = (AMQTransportConnectionState) brokerConnectionStates.get(info
|
||||
.getConnectionId());
|
||||
if (state == null)
|
||||
{
|
||||
state = new AMQTransportConnectionState(info, this);
|
||||
brokerConnectionStates.put(info.getConnectionId(), state);
|
||||
}
|
||||
state.incrementReference();
|
||||
}
|
||||
// If there are 2 concurrent connections for the same connection id,
|
||||
// then last one in wins, we need to sync here
|
||||
// to figure out the winner.
|
||||
synchronized (state.getConnectionMutex())
|
||||
{
|
||||
if (state.getConnection() != this)
|
||||
{
|
||||
state.getConnection().disconnect(true);
|
||||
state.setConnection(this);
|
||||
state.reset(info);
|
||||
}
|
||||
}
|
||||
state = new ConnectionState(info);
|
||||
|
||||
context = new AMQConnectionContext();
|
||||
|
||||
state.reset(info);
|
||||
|
||||
registerConnectionState(info.getConnectionId(), state);
|
||||
|
||||
this.faultTolerantConnection = info.isFaultTolerant();
|
||||
// Setup the context.
|
||||
String clientId = info.getClientId();
|
||||
context = new AMQConnectionContext();
|
||||
context.setBroker(protocolManager);
|
||||
context.setClientId(clientId);
|
||||
context.setClientMaster(info.isClientMaster());
|
||||
|
@ -671,8 +642,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
context.setReconnect(info.isFailoverReconnect());
|
||||
this.manageable = info.isManageable();
|
||||
context.setConnectionState(state);
|
||||
state.setContext(context);
|
||||
state.setConnection(this);
|
||||
if (info.getClientIp() == null)
|
||||
{
|
||||
info.setClientIp(getRemoteAddress());
|
||||
|
@ -684,12 +653,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
synchronized (brokerConnectionStates)
|
||||
{
|
||||
brokerConnectionStates.remove(info.getConnectionId());
|
||||
}
|
||||
unregisterConnectionState(info.getConnectionId());
|
||||
|
||||
if (e instanceof SecurityException)
|
||||
{
|
||||
// close this down - in case the peer of this transport doesn't play
|
||||
|
@ -926,28 +889,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
return this.messageAuthorizationPolicy;
|
||||
}
|
||||
|
||||
protected synchronized AMQTransportConnectionState unregisterConnectionState(
|
||||
ConnectionId connectionId)
|
||||
{
|
||||
return connectionStateRegister.unregisterConnectionState(connectionId);
|
||||
}
|
||||
|
||||
protected synchronized AMQTransportConnectionState registerConnectionState(
|
||||
ConnectionId connectionId, AMQTransportConnectionState state)
|
||||
{
|
||||
AMQTransportConnectionState cs = null;
|
||||
if (!connectionStateRegister.isEmpty()
|
||||
&& !connectionStateRegister.doesHandleMultipleConnectionStates())
|
||||
{
|
||||
// swap implementations
|
||||
AMQTransportConnectionStateRegister newRegister = new AMQMapTransportConnectionStateRegister();
|
||||
newRegister.intialize(connectionStateRegister);
|
||||
connectionStateRegister = newRegister;
|
||||
}
|
||||
cs = connectionStateRegister.registerConnectionState(connectionId, state);
|
||||
return cs;
|
||||
}
|
||||
|
||||
public void delayedStop(final int waitTime, final String reason,
|
||||
Throwable cause)
|
||||
{
|
||||
|
@ -997,16 +938,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
}
|
||||
if (stopping.compareAndSet(false, true))
|
||||
{
|
||||
// Let all the connection contexts know we are shutting down
|
||||
// so that in progress operations can notice and unblock.
|
||||
List<AMQTransportConnectionState> connectionStates = listConnectionStates();
|
||||
for (AMQTransportConnectionState cs : connectionStates)
|
||||
if (context != null)
|
||||
{
|
||||
AMQConnectionContext connectionContext = cs.getContext();
|
||||
if (connectionContext != null)
|
||||
{
|
||||
connectionContext.getStopping().set(true);
|
||||
}
|
||||
context.getStopping().set(true);
|
||||
}
|
||||
try
|
||||
{
|
||||
|
@ -1040,11 +974,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
}
|
||||
}
|
||||
|
||||
protected synchronized List<AMQTransportConnectionState> listConnectionStates()
|
||||
{
|
||||
return connectionStateRegister.listConnectionStates();
|
||||
}
|
||||
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
this.acceptorUsed.onStopped(this);
|
||||
|
@ -1095,19 +1024,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
// from the broker.
|
||||
if (!protocolManager.isStopped())
|
||||
{
|
||||
List<AMQTransportConnectionState> connectionStates = listConnectionStates();
|
||||
connectionStates = listConnectionStates();
|
||||
for (AMQTransportConnectionState cs : connectionStates)
|
||||
context.getStopping().set(true);
|
||||
try
|
||||
{
|
||||
cs.getContext().getStopping().set(true);
|
||||
try
|
||||
{
|
||||
processRemoveConnection(cs.getInfo().getConnectionId(), 0L);
|
||||
}
|
||||
catch (Throwable ignore)
|
||||
{
|
||||
ignore.printStackTrace();
|
||||
}
|
||||
processRemoveConnection(state.getInfo().getConnectionId(), 0L);
|
||||
}
|
||||
catch (Throwable ignore)
|
||||
{
|
||||
ignore.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1134,16 +1058,21 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
return resp;
|
||||
}
|
||||
|
||||
AMQConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id)
|
||||
public void addConsumerBrokerExchange(ConsumerId id, AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap)
|
||||
{
|
||||
AMQConsumerBrokerExchange result = consumerExchanges.get(id);
|
||||
if (result == null)
|
||||
{
|
||||
if (consumerMap.size() == 1)
|
||||
{
|
||||
result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next());
|
||||
}
|
||||
else
|
||||
{
|
||||
result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap);
|
||||
}
|
||||
synchronized (consumerExchanges)
|
||||
{
|
||||
result = new AMQConsumerBrokerExchange();
|
||||
AMQTransportConnectionState state = lookupConnectionState(id);
|
||||
context = state.getContext();
|
||||
result.setConnectionContext(context);
|
||||
SessionState ss = state.getSessionState(id.getParentId());
|
||||
if (ss != null)
|
||||
|
@ -1165,63 +1094,36 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
consumerExchanges.put(id, result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
protected synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
ConsumerId id)
|
||||
{
|
||||
return connectionStateRegister.lookupConnectionState(id);
|
||||
}
|
||||
|
||||
protected synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
ProducerId id)
|
||||
{
|
||||
return connectionStateRegister.lookupConnectionState(id);
|
||||
}
|
||||
|
||||
public int getConsumerCount(ConnectionId connectionId)
|
||||
public int getConsumerCount()
|
||||
{
|
||||
int result = 0;
|
||||
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
|
||||
if (cs != null)
|
||||
for (SessionId sessionId : state.getSessionIds())
|
||||
{
|
||||
for (SessionId sessionId : cs.getSessionIds())
|
||||
SessionState sessionState = state.getSessionState(sessionId);
|
||||
if (sessionState != null)
|
||||
{
|
||||
SessionState sessionState = cs.getSessionState(sessionId);
|
||||
if (sessionState != null)
|
||||
{
|
||||
result += sessionState.getConsumerIds().size();
|
||||
}
|
||||
result += sessionState.getConsumerIds().size();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public int getProducerCount(ConnectionId connectionId)
|
||||
public int getProducerCount()
|
||||
{
|
||||
int result = 0;
|
||||
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
|
||||
if (cs != null)
|
||||
for (SessionId sessionId : state.getSessionIds())
|
||||
{
|
||||
for (SessionId sessionId : cs.getSessionIds())
|
||||
SessionState sessionState = state.getSessionState(sessionId);
|
||||
if (sessionState != null)
|
||||
{
|
||||
SessionState sessionState = cs.getSessionState(sessionId);
|
||||
if (sessionState != null)
|
||||
{
|
||||
result += sessionState.getProducerIds().size();
|
||||
}
|
||||
result += sessionState.getProducerIds().size();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
ConnectionId connectionId)
|
||||
{
|
||||
return connectionStateRegister.lookupConnectionState(connectionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response processAddDestination(DestinationInfo dest) throws Exception
|
||||
{
|
||||
|
@ -1273,20 +1175,18 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
@Override
|
||||
public Response processAddSession(SessionInfo info) throws Exception
|
||||
{
|
||||
ConnectionId connectionId = info.getSessionId().getParentId();
|
||||
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
|
||||
// Avoid replaying dup commands
|
||||
if (cs != null && !cs.getSessionIds().contains(info.getSessionId()))
|
||||
if (!state.getSessionIds().contains(info.getSessionId()))
|
||||
{
|
||||
protocolManager.addSession(this, info);
|
||||
try
|
||||
{
|
||||
cs.addSession(info);
|
||||
state.addSession(info);
|
||||
}
|
||||
catch (IllegalStateException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
protocolManager.removeSession(cs.getContext(), info);
|
||||
protocolManager.removeSession(context, info);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
@ -1422,10 +1322,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
//in that case don't send the response
|
||||
//this will force the client to wait until
|
||||
//the response is got.
|
||||
if (context == null)
|
||||
{
|
||||
this.context = new AMQConnectionContext();
|
||||
}
|
||||
context.setDontSendReponse(true);
|
||||
}
|
||||
else
|
||||
|
@ -1463,9 +1359,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
synchronized (producerExchanges)
|
||||
{
|
||||
result = new AMQProducerBrokerExchange();
|
||||
AMQTransportConnectionState state = lookupConnectionState(id);
|
||||
context = state.getContext();
|
||||
result.setConnectionContext(context);
|
||||
//todo implement reconnect https://issues.apache.org/jira/browse/ARTEMIS-194
|
||||
if (context.isReconnect()
|
||||
|| (context.isNetworkConnection() && this.acceptorUsed
|
||||
.isAuditNetworkProducers()))
|
||||
|
@ -1490,20 +1385,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
producerExchanges.put(id, result);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
context = result.getConnectionContext();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response processMessageAck(MessageAck ack) throws Exception
|
||||
{
|
||||
ConsumerId consumerId = ack.getConsumerId();
|
||||
SessionId sessionId = consumerId.getParentId();
|
||||
AMQSession session = protocolManager.getSession(sessionId);
|
||||
session.acknowledge(ack);
|
||||
AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
|
||||
consumerBrokerExchange.acknowledge(ack);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1523,7 +1412,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
@Override
|
||||
public Response processMessagePull(MessagePull arg0) throws Exception
|
||||
{
|
||||
throw new IllegalStateException("not implemented! ");
|
||||
AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
|
||||
if (amqConsumerBrokerExchange == null)
|
||||
{
|
||||
throw new IllegalStateException("Consumer does not exist");
|
||||
}
|
||||
amqConsumerBrokerExchange.processMessagePull(arg0);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1542,8 +1437,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
@Override
|
||||
public Response processRecoverTransactions(TransactionInfo info) throws Exception
|
||||
{
|
||||
AMQTransportConnectionState cs = lookupConnectionState(info.getConnectionId());
|
||||
Set<SessionId> sIds = cs.getSessionIds();
|
||||
Set<SessionId> sIds = state.getSessionIds();
|
||||
TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
|
||||
return new DataArrayResponse(recovered);
|
||||
}
|
||||
|
@ -1552,48 +1446,31 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
public Response processRemoveConnection(ConnectionId id,
|
||||
long lastDeliveredSequenceId) throws Exception
|
||||
{
|
||||
AMQTransportConnectionState cs = lookupConnectionState(id);
|
||||
if (cs != null)
|
||||
// Don't allow things to be added to the connection state while we
|
||||
// are shutting down.
|
||||
state.shutdown();
|
||||
// Cascade the connection stop to the sessions.
|
||||
for (SessionId sessionId : state.getSessionIds())
|
||||
{
|
||||
// Don't allow things to be added to the connection state while we
|
||||
// are shutting down.
|
||||
cs.shutdown();
|
||||
// Cascade the connection stop to the sessions.
|
||||
for (SessionId sessionId : cs.getSessionIds())
|
||||
{
|
||||
try
|
||||
{
|
||||
processRemoveSession(sessionId, lastDeliveredSequenceId);
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
// LOG
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
protocolManager.removeConnection(cs.getContext(), cs.getInfo(),
|
||||
null);
|
||||
processRemoveSession(sessionId, lastDeliveredSequenceId);
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
// log
|
||||
}
|
||||
AMQTransportConnectionState state = unregisterConnectionState(id);
|
||||
if (state != null)
|
||||
{
|
||||
synchronized (brokerConnectionStates)
|
||||
{
|
||||
// If we are the last reference, we should remove the state
|
||||
// from the broker.
|
||||
if (state.decrementReference() == 0)
|
||||
{
|
||||
brokerConnectionStates.remove(id);
|
||||
}
|
||||
}
|
||||
// LOG
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
protocolManager.removeConnection(context, state.getInfo(),
|
||||
null);
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
// log
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1602,15 +1479,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
long lastDeliveredSequenceId) throws Exception
|
||||
{
|
||||
SessionId sessionId = id.getParentId();
|
||||
ConnectionId connectionId = sessionId.getParentId();
|
||||
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot remove a consumer from a connection that had not been registered: "
|
||||
+ connectionId);
|
||||
}
|
||||
SessionState ss = cs.getSessionState(sessionId);
|
||||
SessionState ss = state.getSessionState(sessionId);
|
||||
if (ss == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
|
@ -1625,8 +1494,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
}
|
||||
ConsumerInfo info = consumerState.getInfo();
|
||||
info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
||||
protocolManager.removeConsumer(cs.getContext(), consumerState.getInfo());
|
||||
|
||||
AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
|
||||
|
||||
consumerBrokerExchange.removeConsumer();
|
||||
|
||||
removeConsumerBrokerExchange(id);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1661,15 +1535,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
public Response processRemoveSession(SessionId id,
|
||||
long lastDeliveredSequenceId) throws Exception
|
||||
{
|
||||
ConnectionId connectionId = id.getParentId();
|
||||
AMQTransportConnectionState cs = lookupConnectionState(connectionId);
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot remove session from connection that had not been registered: "
|
||||
+ connectionId);
|
||||
}
|
||||
SessionState session = cs.getSessionState(id);
|
||||
SessionState session = state.getSessionState(id);
|
||||
if (session == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
|
@ -1701,8 +1567,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
// LOG.warn("Failed to remove producer: {}", producerId, e);
|
||||
}
|
||||
}
|
||||
cs.removeSession(id);
|
||||
protocolManager.removeSession(cs.getContext(), session.getInfo());
|
||||
state.removeSession(id);
|
||||
protocolManager.removeSession(context, session.getInfo());
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1790,7 +1656,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
|
||||
public AMQConnectionContext getConext()
|
||||
{
|
||||
return this.state.getContext();
|
||||
return this.context;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.activemq.command.ConsumerInfo;
|
|||
import org.apache.activemq.command.DestinationInfo;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.MessagePull;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
|
@ -69,7 +71,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
|
|||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||
import org.apache.activemq.artemis.core.security.CheckType;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
@ -114,8 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
protected final ProducerId advisoryProducerId = new ProducerId();
|
||||
|
||||
// from broker
|
||||
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = Collections
|
||||
.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
|
||||
protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections
|
||||
.synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>());
|
||||
|
||||
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>();
|
||||
|
||||
|
@ -131,6 +132,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
|
||||
private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
|
||||
|
||||
private final ScheduledExecutorService scheduledPool;
|
||||
|
||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
|
||||
{
|
||||
this.factory = factory;
|
||||
|
@ -141,6 +144,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
brokerState = new BrokerState();
|
||||
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
|
||||
ManagementService service = server.getManagementService();
|
||||
scheduledPool = server.getScheduledPool();
|
||||
if (service != null)
|
||||
{
|
||||
service.addNotificationListener(this);
|
||||
|
@ -239,7 +243,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
}
|
||||
|
||||
public void handleCommand(OpenWireConnection openWireConnection,
|
||||
Object command)
|
||||
Object command) throws Exception
|
||||
{
|
||||
Command amqCmd = (Command) command;
|
||||
byte type = amqCmd.getDataStructureType();
|
||||
|
@ -252,6 +256,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
* failover and load balancing. These features are not yet implemented for Artemis OpenWire. Instead we
|
||||
* simply drop the packet. See: ACTIVEMQ6-108 */
|
||||
break;
|
||||
case CommandTypes.MESSAGE_PULL:
|
||||
MessagePull messagePull = (MessagePull) amqCmd;
|
||||
openWireConnection.processMessagePull(messagePull);
|
||||
break;
|
||||
case CommandTypes.CONSUMER_CONTROL:
|
||||
break;
|
||||
default:
|
||||
|
@ -306,11 +314,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
}
|
||||
}
|
||||
|
||||
public Map<ConnectionId, ConnectionState> getConnectionStates()
|
||||
{
|
||||
return this.brokerConnectionStates;
|
||||
}
|
||||
|
||||
public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception
|
||||
{
|
||||
String username = info.getUserName();
|
||||
|
@ -483,8 +486,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
{
|
||||
SessionId sessionId = info.getProducerId().getParentId();
|
||||
ConnectionId connectionId = sessionId.getParentId();
|
||||
AMQTransportConnectionState cs = theConn
|
||||
.lookupConnectionState(connectionId);
|
||||
ConnectionState cs = theConn.getState();
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
|
@ -505,7 +507,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
if (destination != null
|
||||
&& !AdvisorySupport.isAdvisoryTopic(destination))
|
||||
{
|
||||
if (theConn.getProducerCount(connectionId) >= theConn
|
||||
if (theConn.getProducerCount() >= theConn
|
||||
.getMaximumProducersAllowedPerConnection())
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
|
@ -541,8 +543,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
// Todo: add a destination interceptors holder here (amq supports this)
|
||||
SessionId sessionId = info.getConsumerId().getParentId();
|
||||
ConnectionId connectionId = sessionId.getParentId();
|
||||
AMQTransportConnectionState cs = theConn
|
||||
.lookupConnectionState(connectionId);
|
||||
ConnectionState cs = theConn.getState();
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
|
@ -564,7 +565,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
if (destination != null
|
||||
&& !AdvisorySupport.isAdvisoryTopic(destination))
|
||||
{
|
||||
if (theConn.getConsumerCount(connectionId) >= theConn
|
||||
if (theConn.getConsumerCount() >= theConn
|
||||
.getMaximumConsumersAllowedPerConnection())
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
|
@ -580,17 +581,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
throw new IllegalStateException("Session not exist! : " + sessionId);
|
||||
}
|
||||
|
||||
amqSession.createConsumer(info);
|
||||
amqSession.createConsumer(info, amqSession);
|
||||
|
||||
try
|
||||
{
|
||||
ss.addConsumer(info);
|
||||
theConn.addConsumerBrokerExchange(info.getConsumerId());
|
||||
}
|
||||
catch (IllegalStateException e)
|
||||
{
|
||||
amqSession.removeConsumer(info);
|
||||
}
|
||||
ss.addConsumer(info);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -614,7 +607,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
boolean internal)
|
||||
{
|
||||
AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss,
|
||||
server, theConn, this);
|
||||
server, theConn, scheduledPool, this);
|
||||
amqSession.initialize();
|
||||
amqSession.setInternal(internal);
|
||||
sessions.put(ss.getSessionId(), amqSession);
|
||||
|
@ -644,13 +637,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
}
|
||||
}
|
||||
|
||||
public void removeConsumer(AMQConnectionContext context, ConsumerInfo info) throws Exception
|
||||
{
|
||||
SessionId sessionId = info.getConsumerId().getParentId();
|
||||
AMQSession session = sessions.get(sessionId);
|
||||
session.removeConsumer(info);
|
||||
}
|
||||
|
||||
public void removeProducer(ProducerId id)
|
||||
{
|
||||
SessionId sessionId = id.getParentId();
|
||||
|
@ -677,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
{
|
||||
SimpleString qName = new SimpleString("jms.queue."
|
||||
+ dest.getPhysicalName());
|
||||
ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId());
|
||||
ConnectionState state = connection.getState();
|
||||
ConnectionInfo connInfo = state.getInfo();
|
||||
if (connInfo != null)
|
||||
{
|
||||
|
@ -849,12 +835,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
{
|
||||
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
|
||||
ConnectionId connId = sessionId.getParentId();
|
||||
AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId);
|
||||
OpenWireConnection conn = cc.getConnection();
|
||||
OpenWireConnection cc = this.brokerConnectionStates.get(connId);
|
||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
|
||||
|
||||
fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId());
|
||||
fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessagePull;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange
|
||||
{
|
||||
|
||||
private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
|
||||
|
||||
public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap)
|
||||
{
|
||||
super(amqSession);
|
||||
this.consumerMap = consumerMap;
|
||||
}
|
||||
|
||||
public void processMessagePull(MessagePull messagePull) throws Exception
|
||||
{
|
||||
AMQConsumer amqConsumer = consumerMap.get(messagePull.getDestination());
|
||||
if (amqConsumer != null)
|
||||
{
|
||||
amqConsumer.processMessagePull(messagePull);
|
||||
}
|
||||
}
|
||||
|
||||
public void acknowledge(MessageAck ack) throws Exception
|
||||
{
|
||||
AMQConsumer amqConsumer = consumerMap.get(ack.getDestination());
|
||||
if (amqConsumer != null)
|
||||
{
|
||||
amqSession.acknowledge(ack, amqConsumer);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeConsumer() throws Exception
|
||||
{
|
||||
for (AMQConsumer amqConsumer : consumerMap.values())
|
||||
{
|
||||
amqConsumer.removeConsumer();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,13 +21,19 @@ import java.util.Iterator;
|
|||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.MessagePull;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -42,20 +48,28 @@ public class AMQConsumer implements BrowserListener
|
|||
private AMQSession session;
|
||||
private org.apache.activemq.command.ActiveMQDestination actualDest;
|
||||
private ConsumerInfo info;
|
||||
private final ScheduledExecutorService scheduledPool;
|
||||
private long nativeId = -1;
|
||||
private SimpleString subQueueName = null;
|
||||
|
||||
private final int prefetchSize;
|
||||
private AtomicInteger currentSize;
|
||||
private AtomicInteger windowAvailable;
|
||||
private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<MessageInfo>();
|
||||
private long messagePullSequence = 0;
|
||||
private MessagePullHandler messagePullHandler;
|
||||
|
||||
public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info)
|
||||
public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info, ScheduledExecutorService scheduledPool)
|
||||
{
|
||||
this.session = amqSession;
|
||||
this.actualDest = d;
|
||||
this.info = info;
|
||||
this.scheduledPool = scheduledPool;
|
||||
this.prefetchSize = info.getPrefetchSize();
|
||||
this.currentSize = new AtomicInteger(0);
|
||||
this.windowAvailable = new AtomicInteger(prefetchSize);
|
||||
if (prefetchSize == 0)
|
||||
{
|
||||
messagePullHandler = new MessagePullHandler();
|
||||
}
|
||||
}
|
||||
|
||||
public void init() throws Exception
|
||||
|
@ -130,12 +144,12 @@ public class AMQConsumer implements BrowserListener
|
|||
coreSession.createQueue(address, subQueueName, selector, true, false);
|
||||
}
|
||||
|
||||
coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, Integer.MAX_VALUE);
|
||||
coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
|
||||
}
|
||||
else
|
||||
{
|
||||
SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
|
||||
coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, Integer.MAX_VALUE);
|
||||
coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
|
||||
}
|
||||
|
||||
if (info.isBrowser())
|
||||
|
@ -163,23 +177,14 @@ public class AMQConsumer implements BrowserListener
|
|||
|
||||
public void acquireCredit(int n) throws Exception
|
||||
{
|
||||
this.currentSize.addAndGet(-n);
|
||||
if (currentSize.get() < prefetchSize)
|
||||
boolean promptDelivery = windowAvailable.get() == 0;
|
||||
if (windowAvailable.get() < prefetchSize)
|
||||
{
|
||||
AtomicInteger credits = session.getCoreSession().getConsumerCredits(nativeId);
|
||||
credits.set(0);
|
||||
session.getCoreSession().receiveConsumerCredits(nativeId, Integer.MAX_VALUE);
|
||||
this.windowAvailable.addAndGet(n);
|
||||
}
|
||||
}
|
||||
|
||||
public void checkCreditOnDelivery() throws Exception
|
||||
{
|
||||
this.currentSize.incrementAndGet();
|
||||
|
||||
if (currentSize.get() == prefetchSize)
|
||||
if (promptDelivery)
|
||||
{
|
||||
//stop because reach prefetchSize
|
||||
session.getCoreSession().receiveConsumerCredits(nativeId, 0);
|
||||
session.getCoreSession().promptDelivery(nativeId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -188,12 +193,16 @@ public class AMQConsumer implements BrowserListener
|
|||
MessageDispatch dispatch;
|
||||
try
|
||||
{
|
||||
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
//decrement deliveryCount as AMQ client tends to add 1.
|
||||
dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this);
|
||||
int size = dispatch.getMessage().getSize();
|
||||
this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size));
|
||||
session.deliverMessage(dispatch);
|
||||
checkCreditOnDelivery();
|
||||
windowAvailable.decrementAndGet();
|
||||
return size;
|
||||
}
|
||||
catch (IOException e)
|
||||
|
@ -206,6 +215,16 @@ public class AMQConsumer implements BrowserListener
|
|||
}
|
||||
}
|
||||
|
||||
public void handleDeliverNullDispatch()
|
||||
{
|
||||
MessageDispatch md = new MessageDispatch();
|
||||
md.setConsumerId(getId());
|
||||
md.setDestination(actualDest);
|
||||
session.deliverMessage(md);
|
||||
windowAvailable.decrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
public void acknowledge(MessageAck ack) throws Exception
|
||||
{
|
||||
MessageId first = ack.getFirstMessageId();
|
||||
|
@ -400,4 +419,90 @@ public class AMQConsumer implements BrowserListener
|
|||
{
|
||||
return info;
|
||||
}
|
||||
|
||||
public boolean hasCredits()
|
||||
{
|
||||
return windowAvailable.get() > 0;
|
||||
}
|
||||
|
||||
public void processMessagePull(MessagePull messagePull) throws Exception
|
||||
{
|
||||
windowAvailable.incrementAndGet();
|
||||
|
||||
if (messagePullHandler != null)
|
||||
{
|
||||
messagePullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout());
|
||||
}
|
||||
}
|
||||
|
||||
public void removeConsumer() throws Exception
|
||||
{
|
||||
session.removeConsumer(nativeId);
|
||||
}
|
||||
|
||||
private class MessagePullHandler
|
||||
{
|
||||
private long next = -1;
|
||||
private long timeout;
|
||||
private CountDownLatch latch = new CountDownLatch(1);
|
||||
private ScheduledFuture<?> messagePullFuture;
|
||||
|
||||
public void nextSequence(long next, long timeout) throws Exception
|
||||
{
|
||||
this.next = next;
|
||||
this.timeout = timeout;
|
||||
latch = new CountDownLatch(1);
|
||||
session.getCoreSession().forceConsumerDelivery(nativeId, messagePullSequence);
|
||||
//if we are 0 timeout or less we need to wait to get either the forced message or a real message.
|
||||
if (timeout <= 0)
|
||||
{
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
//this means we have received no message just the forced delivery message
|
||||
if (this.next >= 0)
|
||||
{
|
||||
handleDeliverNullDispatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean checkForcedConsumer(ServerMessage message)
|
||||
{
|
||||
if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
|
||||
{
|
||||
System.out.println("MessagePullHandler.checkForcedConsumer");
|
||||
if (next >= 0)
|
||||
{
|
||||
if (timeout <= 0)
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
else
|
||||
{
|
||||
messagePullFuture = scheduledPool.schedule(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
if (next >= 0)
|
||||
{
|
||||
handleDeliverNullDispatch();
|
||||
}
|
||||
}
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
next = -1;
|
||||
if (messagePullFuture != null)
|
||||
{
|
||||
messagePullFuture.cancel(true);
|
||||
}
|
||||
latch.countDown();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,13 +16,22 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
public class AMQConsumerBrokerExchange
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessagePull;
|
||||
|
||||
public abstract class AMQConsumerBrokerExchange
|
||||
{
|
||||
protected final AMQSession amqSession;
|
||||
private AMQConnectionContext connectionContext;
|
||||
private AMQDestination regionDestination;
|
||||
private AMQSubscription subscription;
|
||||
private boolean wildcard;
|
||||
|
||||
public AMQConsumerBrokerExchange(AMQSession amqSession)
|
||||
{
|
||||
this.amqSession = amqSession;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the connectionContext
|
||||
*/
|
||||
|
@ -90,4 +99,10 @@ public class AMQConsumerBrokerExchange
|
|||
{
|
||||
this.wildcard = wildcard;
|
||||
}
|
||||
|
||||
public abstract void acknowledge(MessageAck ack) throws Exception;
|
||||
|
||||
public abstract void processMessagePull(MessagePull messagePull) throws Exception;
|
||||
|
||||
public abstract void removeConsumer() throws Exception;
|
||||
}
|
||||
|
|
|
@ -1,151 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.SessionId;
|
||||
|
||||
public class AMQMapTransportConnectionStateRegister implements
|
||||
AMQTransportConnectionStateRegister
|
||||
{
|
||||
|
||||
private Map<ConnectionId, AMQTransportConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, AMQTransportConnectionState>();
|
||||
|
||||
public AMQTransportConnectionState registerConnectionState(
|
||||
ConnectionId connectionId, AMQTransportConnectionState state)
|
||||
{
|
||||
AMQTransportConnectionState rc = connectionStates
|
||||
.put(connectionId, state);
|
||||
return rc;
|
||||
}
|
||||
|
||||
public AMQTransportConnectionState unregisterConnectionState(
|
||||
ConnectionId connectionId)
|
||||
{
|
||||
AMQTransportConnectionState rc = connectionStates.remove(connectionId);
|
||||
if (rc.getReferenceCounter().get() > 1)
|
||||
{
|
||||
rc.decrementReference();
|
||||
connectionStates.put(connectionId, rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
public List<AMQTransportConnectionState> listConnectionStates()
|
||||
{
|
||||
|
||||
List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>();
|
||||
rc.addAll(connectionStates.values());
|
||||
return rc;
|
||||
}
|
||||
|
||||
public AMQTransportConnectionState lookupConnectionState(String connectionId)
|
||||
{
|
||||
return connectionStates.get(new ConnectionId(connectionId));
|
||||
}
|
||||
|
||||
public AMQTransportConnectionState lookupConnectionState(ConsumerId id)
|
||||
{
|
||||
AMQTransportConnectionState cs = lookupConnectionState(id
|
||||
.getConnectionId());
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a consumer from a connection that had not been registered: "
|
||||
+ id.getParentId().getParentId());
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public AMQTransportConnectionState lookupConnectionState(ProducerId id)
|
||||
{
|
||||
AMQTransportConnectionState cs = lookupConnectionState(id
|
||||
.getConnectionId());
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a producer from a connection that had not been registered: "
|
||||
+ id.getParentId().getParentId());
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public AMQTransportConnectionState lookupConnectionState(SessionId id)
|
||||
{
|
||||
AMQTransportConnectionState cs = lookupConnectionState(id
|
||||
.getConnectionId());
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a session from a connection that had not been registered: "
|
||||
+ id.getParentId());
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public AMQTransportConnectionState lookupConnectionState(
|
||||
ConnectionId connectionId)
|
||||
{
|
||||
AMQTransportConnectionState cs = connectionStates.get(connectionId);
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a connection that had not been registered: "
|
||||
+ connectionId);
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public boolean doesHandleMultipleConnectionStates()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isEmpty()
|
||||
{
|
||||
return connectionStates.isEmpty();
|
||||
}
|
||||
|
||||
public void clear()
|
||||
{
|
||||
connectionStates.clear();
|
||||
|
||||
}
|
||||
|
||||
public void intialize(AMQTransportConnectionStateRegister other)
|
||||
{
|
||||
connectionStates.clear();
|
||||
connectionStates.putAll(other.mapStates());
|
||||
|
||||
}
|
||||
|
||||
public Map<ConnectionId, AMQTransportConnectionState> mapStates()
|
||||
{
|
||||
HashMap<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>(
|
||||
connectionStates);
|
||||
return map;
|
||||
}
|
||||
|
||||
}
|
|
@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.ExceptionResponse;
|
||||
import org.apache.activemq.command.Message;
|
||||
|
@ -69,12 +68,10 @@ public class AMQSession implements SessionCallback
|
|||
private SessionInfo sessInfo;
|
||||
private ActiveMQServer server;
|
||||
private OpenWireConnection connection;
|
||||
//native id -> consumer
|
||||
private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<Long, AMQConsumer>();
|
||||
//amq id -> native id
|
||||
private Map<Long, Long> consumerIdMap = new HashMap<Long, Long>();
|
||||
|
||||
private Map<Long, AMQProducer> producers = new HashMap<Long, AMQProducer>();
|
||||
private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
|
||||
|
||||
private Map<Long, AMQProducer> producers = new HashMap<>();
|
||||
|
||||
private AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
|
@ -82,15 +79,18 @@ public class AMQSession implements SessionCallback
|
|||
|
||||
private boolean isTx;
|
||||
|
||||
private final ScheduledExecutorService scheduledPool;
|
||||
|
||||
private OpenWireProtocolManager manager;
|
||||
|
||||
public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo,
|
||||
ActiveMQServer server, OpenWireConnection connection, OpenWireProtocolManager manager)
|
||||
ActiveMQServer server, OpenWireConnection connection, ScheduledExecutorService scheduledPool, OpenWireProtocolManager manager)
|
||||
{
|
||||
this.connInfo = connInfo;
|
||||
this.sessInfo = sessInfo;
|
||||
this.server = server;
|
||||
this.connection = connection;
|
||||
this.scheduledPool = scheduledPool;
|
||||
this.manager = manager;
|
||||
}
|
||||
|
||||
|
@ -123,7 +123,7 @@ public class AMQSession implements SessionCallback
|
|||
|
||||
}
|
||||
|
||||
public void createConsumer(ConsumerInfo info) throws Exception
|
||||
public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception
|
||||
{
|
||||
//check destination
|
||||
ActiveMQDestination dest = info.getDestination();
|
||||
|
@ -136,7 +136,7 @@ public class AMQSession implements SessionCallback
|
|||
{
|
||||
dests = new ActiveMQDestination[] {dest};
|
||||
}
|
||||
|
||||
Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
|
||||
for (ActiveMQDestination d : dests)
|
||||
{
|
||||
if (d.isQueue())
|
||||
|
@ -144,11 +144,13 @@ public class AMQSession implements SessionCallback
|
|||
SimpleString queueName = OpenWireUtil.toCoreAddress(d);
|
||||
getCoreServer().getJMSQueueCreator().create(queueName);
|
||||
}
|
||||
AMQConsumer consumer = new AMQConsumer(this, d, info);
|
||||
AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
|
||||
consumer.init();
|
||||
consumerMap.put(d, consumer);
|
||||
consumers.put(consumer.getNativeId(), consumer);
|
||||
this.consumerIdMap.put(info.getConsumerId().getValue(), consumer.getNativeId());
|
||||
}
|
||||
connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
|
||||
|
||||
coreSession.start();
|
||||
started.set(true);
|
||||
}
|
||||
|
@ -214,7 +216,8 @@ public class AMQSession implements SessionCallback
|
|||
@Override
|
||||
public boolean hasCredits(ServerConsumer consumerID)
|
||||
{
|
||||
return true;
|
||||
AMQConsumer amqConsumer = consumers.get(consumerID.getID());
|
||||
return amqConsumer.hasCredits();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -234,19 +237,12 @@ public class AMQSession implements SessionCallback
|
|||
return this.server;
|
||||
}
|
||||
|
||||
public void removeConsumer(ConsumerInfo info) throws Exception
|
||||
public void removeConsumer(long consumerId) throws Exception
|
||||
{
|
||||
long consumerId = info.getConsumerId().getValue();
|
||||
long nativeId = this.consumerIdMap.remove(consumerId);
|
||||
if (this.txId != null || this.isTx)
|
||||
{
|
||||
((AMQServerSession)coreSession).amqCloseConsumer(nativeId, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
((AMQServerSession)coreSession).amqCloseConsumer(nativeId, true);
|
||||
}
|
||||
AMQConsumer consumer = consumers.remove(nativeId);
|
||||
boolean failed = !(this.txId != null || this.isTx);
|
||||
|
||||
coreSession.amqCloseConsumer(consumerId, failed);
|
||||
consumers.remove(consumerId);
|
||||
}
|
||||
|
||||
public void createProducer(ProducerInfo info) throws Exception
|
||||
|
@ -331,16 +327,13 @@ public class AMQSession implements SessionCallback
|
|||
return this.connection.getMarshaller();
|
||||
}
|
||||
|
||||
public void acknowledge(MessageAck ack) throws Exception
|
||||
public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception
|
||||
{
|
||||
TransactionId tid = ack.getTransactionId();
|
||||
if (tid != null)
|
||||
{
|
||||
this.resetSessionTx(ack.getTransactionId());
|
||||
}
|
||||
ConsumerId consumerId = ack.getConsumerId();
|
||||
long nativeConsumerId = consumerIdMap.get(consumerId.getValue());
|
||||
AMQConsumer consumer = consumers.get(nativeConsumerId);
|
||||
consumer.acknowledge(ack);
|
||||
|
||||
if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE)
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessagePull;
|
||||
|
||||
public class AMQSingleConsumerBrokerExchange extends AMQConsumerBrokerExchange
|
||||
{
|
||||
private AMQConsumer consumer;
|
||||
|
||||
public AMQSingleConsumerBrokerExchange(AMQSession amqSession, AMQConsumer consumer)
|
||||
{
|
||||
super(amqSession);
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
public void processMessagePull(MessagePull messagePull) throws Exception
|
||||
{
|
||||
consumer.processMessagePull(messagePull);
|
||||
}
|
||||
|
||||
public void removeConsumer() throws Exception
|
||||
{
|
||||
consumer.removeConsumer();
|
||||
}
|
||||
|
||||
public void acknowledge(MessageAck ack) throws Exception
|
||||
{
|
||||
amqSession.acknowledge(ack, consumer);
|
||||
}
|
||||
}
|
|
@ -1,184 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.SessionId;
|
||||
|
||||
/**
|
||||
* We just copy this structure from amq, but what's the purpose
|
||||
* and can it be removed ?
|
||||
*/
|
||||
public class AMQSingleTransportConnectionStateRegister implements
|
||||
AMQTransportConnectionStateRegister
|
||||
{
|
||||
|
||||
private AMQTransportConnectionState connectionState;
|
||||
private ConnectionId connectionId;
|
||||
|
||||
public AMQTransportConnectionState registerConnectionState(
|
||||
ConnectionId connectionId, AMQTransportConnectionState state)
|
||||
{
|
||||
AMQTransportConnectionState rc = connectionState;
|
||||
connectionState = state;
|
||||
this.connectionId = connectionId;
|
||||
return rc;
|
||||
}
|
||||
|
||||
public synchronized AMQTransportConnectionState unregisterConnectionState(
|
||||
ConnectionId connectionId)
|
||||
{
|
||||
AMQTransportConnectionState rc = null;
|
||||
|
||||
if (connectionId != null && connectionState != null
|
||||
&& this.connectionId != null)
|
||||
{
|
||||
if (this.connectionId.equals(connectionId))
|
||||
{
|
||||
rc = connectionState;
|
||||
connectionState = null;
|
||||
connectionId = null;
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
public synchronized List<AMQTransportConnectionState> listConnectionStates()
|
||||
{
|
||||
List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>();
|
||||
if (connectionState != null)
|
||||
{
|
||||
rc.add(connectionState);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
public synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
String connectionId)
|
||||
{
|
||||
AMQTransportConnectionState cs = connectionState;
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a connectionId for a connection that had not been registered: "
|
||||
+ connectionId);
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
ConsumerId id)
|
||||
{
|
||||
AMQTransportConnectionState cs = connectionState;
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a consumer from a connection that had not been registered: "
|
||||
+ id.getParentId().getParentId());
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
ProducerId id)
|
||||
{
|
||||
AMQTransportConnectionState cs = connectionState;
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a producer from a connection that had not been registered: "
|
||||
+ id.getParentId().getParentId());
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
SessionId id)
|
||||
{
|
||||
AMQTransportConnectionState cs = connectionState;
|
||||
if (cs == null)
|
||||
{
|
||||
throw new IllegalStateException(
|
||||
"Cannot lookup a session from a connection that had not been registered: "
|
||||
+ id.getParentId());
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
public synchronized AMQTransportConnectionState lookupConnectionState(
|
||||
ConnectionId connectionId)
|
||||
{
|
||||
AMQTransportConnectionState cs = connectionState;
|
||||
return cs;
|
||||
}
|
||||
|
||||
public synchronized boolean doesHandleMultipleConnectionStates()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
public synchronized boolean isEmpty()
|
||||
{
|
||||
return connectionState == null;
|
||||
}
|
||||
|
||||
public void intialize(AMQTransportConnectionStateRegister other)
|
||||
{
|
||||
|
||||
if (other.isEmpty())
|
||||
{
|
||||
clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
Map map = other.mapStates();
|
||||
Iterator i = map.entrySet().iterator();
|
||||
Map.Entry<ConnectionId, AMQTransportConnectionState> entry = (Entry<ConnectionId, AMQTransportConnectionState>) i
|
||||
.next();
|
||||
connectionId = entry.getKey();
|
||||
connectionState = entry.getValue();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Map<ConnectionId, AMQTransportConnectionState> mapStates()
|
||||
{
|
||||
Map<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>();
|
||||
if (!isEmpty())
|
||||
{
|
||||
map.put(connectionId, connectionState);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
public void clear()
|
||||
{
|
||||
connectionState = null;
|
||||
connectionId = null;
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.state.ConnectionState;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
|
||||
|
||||
public class AMQTransportConnectionState extends ConnectionState
|
||||
{
|
||||
|
||||
private AMQConnectionContext context;
|
||||
private OpenWireConnection connection;
|
||||
private AtomicInteger referenceCounter = new AtomicInteger();
|
||||
private final Object connectionMutex = new Object();
|
||||
|
||||
public AMQTransportConnectionState(ConnectionInfo info,
|
||||
OpenWireConnection transportConnection)
|
||||
{
|
||||
super(info);
|
||||
connection = transportConnection;
|
||||
}
|
||||
|
||||
public AMQConnectionContext getContext()
|
||||
{
|
||||
return context;
|
||||
}
|
||||
|
||||
public OpenWireConnection getConnection()
|
||||
{
|
||||
return connection;
|
||||
}
|
||||
|
||||
public void setContext(AMQConnectionContext context)
|
||||
{
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
public void setConnection(OpenWireConnection connection)
|
||||
{
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
public int incrementReference()
|
||||
{
|
||||
return referenceCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
public int decrementReference()
|
||||
{
|
||||
return referenceCounter.decrementAndGet();
|
||||
}
|
||||
|
||||
public AtomicInteger getReferenceCounter()
|
||||
{
|
||||
return referenceCounter;
|
||||
}
|
||||
|
||||
public void setReferenceCounter(AtomicInteger referenceCounter)
|
||||
{
|
||||
this.referenceCounter = referenceCounter;
|
||||
}
|
||||
|
||||
public Object getConnectionMutex()
|
||||
{
|
||||
return connectionMutex;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.SessionId;
|
||||
|
||||
/**
|
||||
* What's the purpose of this?
|
||||
*/
|
||||
public interface AMQTransportConnectionStateRegister
|
||||
{
|
||||
AMQTransportConnectionState registerConnectionState(ConnectionId connectionId,
|
||||
AMQTransportConnectionState state);
|
||||
|
||||
AMQTransportConnectionState unregisterConnectionState(ConnectionId connectionId);
|
||||
|
||||
List<AMQTransportConnectionState> listConnectionStates();
|
||||
|
||||
Map<ConnectionId, AMQTransportConnectionState> mapStates();
|
||||
|
||||
AMQTransportConnectionState lookupConnectionState(String connectionId);
|
||||
|
||||
AMQTransportConnectionState lookupConnectionState(ConsumerId id);
|
||||
|
||||
AMQTransportConnectionState lookupConnectionState(ProducerId id);
|
||||
|
||||
AMQTransportConnectionState lookupConnectionState(SessionId id);
|
||||
|
||||
AMQTransportConnectionState lookupConnectionState(ConnectionId connectionId);
|
||||
|
||||
boolean isEmpty();
|
||||
|
||||
boolean doesHandleMultipleConnectionStates();
|
||||
|
||||
void intialize(AMQTransportConnectionStateRegister other);
|
||||
|
||||
void clear();
|
||||
|
||||
}
|
|
@ -788,6 +788,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public void promptDelivery(long consumerID)
|
||||
{
|
||||
ServerConsumer consumer = consumers.get(consumerID);
|
||||
|
||||
// this would be possible if the server consumer was closed by pings/pongs.. etc
|
||||
if (consumer != null)
|
||||
{
|
||||
consumer.promptDelivery();
|
||||
}
|
||||
}
|
||||
|
||||
public void acknowledge(final long consumerID, final long messageID) throws Exception
|
||||
{
|
||||
ServerConsumer consumer = consumers.get(consumerID);
|
||||
|
|
Loading…
Reference in New Issue