diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 674055b04a..e465ba701d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -40,6 +40,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.BrokerInfo; @@ -77,14 +80,10 @@ import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMapTransportConnectionStateRegister; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleTransportConnectionStateRegister; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionStateRegister; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -129,8 +128,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor private OpenWireFormat wireFormat; - private AMQTransportConnectionStateRegister connectionStateRegister = new AMQSingleTransportConnectionStateRegister(); - private boolean faultTolerantConnection; private AMQConnectionContext context; @@ -175,12 +172,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor private final Map consumerExchanges = new HashMap(); private final Map producerExchanges = new HashMap(); - private AMQTransportConnectionState state; + private ConnectionState state; private final Set tempQueues = new ConcurrentHashSet(); - protected final Map brokerConnectionStates; - private DataInputWrapper dataInput = new DataInputWrapper(); private Map txMap = new ConcurrentHashMap(); @@ -194,7 +189,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor this.transportConnection = connection; this.acceptorUsed = new AMQConnectorImpl(acceptorUsed); this.wireFormat = wf; - brokerConnectionStates = protocolManager.getConnectionStates(); this.creationTime = System.currentTimeMillis(); } @@ -299,7 +293,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor context.setDontSendReponse(false); response = null; } - context = null; } if (response != null && !protocolManager.isStopping()) @@ -621,38 +614,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor info.setClientMaster(true); } - // Make sure 2 concurrent connections by the same ID only generate 1 - // TransportConnectionState object. - synchronized (brokerConnectionStates) - { - state = (AMQTransportConnectionState) brokerConnectionStates.get(info - .getConnectionId()); - if (state == null) - { - state = new AMQTransportConnectionState(info, this); - brokerConnectionStates.put(info.getConnectionId(), state); - } - state.incrementReference(); - } - // If there are 2 concurrent connections for the same connection id, - // then last one in wins, we need to sync here - // to figure out the winner. - synchronized (state.getConnectionMutex()) - { - if (state.getConnection() != this) - { - state.getConnection().disconnect(true); - state.setConnection(this); - state.reset(info); - } - } + state = new ConnectionState(info); + + context = new AMQConnectionContext(); + + state.reset(info); - registerConnectionState(info.getConnectionId(), state); this.faultTolerantConnection = info.isFaultTolerant(); // Setup the context. String clientId = info.getClientId(); - context = new AMQConnectionContext(); context.setBroker(protocolManager); context.setClientId(clientId); context.setClientMaster(info.isClientMaster()); @@ -671,8 +642,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor context.setReconnect(info.isFailoverReconnect()); this.manageable = info.isManageable(); context.setConnectionState(state); - state.setContext(context); - state.setConnection(this); if (info.getClientIp() == null) { info.setClientIp(getRemoteAddress()); @@ -684,12 +653,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } catch (Exception e) { - synchronized (brokerConnectionStates) - { - brokerConnectionStates.remove(info.getConnectionId()); - } - unregisterConnectionState(info.getConnectionId()); - if (e instanceof SecurityException) { // close this down - in case the peer of this transport doesn't play @@ -926,28 +889,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor return this.messageAuthorizationPolicy; } - protected synchronized AMQTransportConnectionState unregisterConnectionState( - ConnectionId connectionId) - { - return connectionStateRegister.unregisterConnectionState(connectionId); - } - - protected synchronized AMQTransportConnectionState registerConnectionState( - ConnectionId connectionId, AMQTransportConnectionState state) - { - AMQTransportConnectionState cs = null; - if (!connectionStateRegister.isEmpty() - && !connectionStateRegister.doesHandleMultipleConnectionStates()) - { - // swap implementations - AMQTransportConnectionStateRegister newRegister = new AMQMapTransportConnectionStateRegister(); - newRegister.intialize(connectionStateRegister); - connectionStateRegister = newRegister; - } - cs = connectionStateRegister.registerConnectionState(connectionId, state); - return cs; - } - public void delayedStop(final int waitTime, final String reason, Throwable cause) { @@ -997,16 +938,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } if (stopping.compareAndSet(false, true)) { - // Let all the connection contexts know we are shutting down - // so that in progress operations can notice and unblock. - List connectionStates = listConnectionStates(); - for (AMQTransportConnectionState cs : connectionStates) + if (context != null) { - AMQConnectionContext connectionContext = cs.getContext(); - if (connectionContext != null) - { - connectionContext.getStopping().set(true); - } + context.getStopping().set(true); } try { @@ -1040,11 +974,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } } - protected synchronized List listConnectionStates() - { - return connectionStateRegister.listConnectionStates(); - } - protected void doStop() throws Exception { this.acceptorUsed.onStopped(this); @@ -1095,19 +1024,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor // from the broker. if (!protocolManager.isStopped()) { - List connectionStates = listConnectionStates(); - connectionStates = listConnectionStates(); - for (AMQTransportConnectionState cs : connectionStates) + context.getStopping().set(true); + try { - cs.getContext().getStopping().set(true); - try - { - processRemoveConnection(cs.getInfo().getConnectionId(), 0L); - } - catch (Throwable ignore) - { - ignore.printStackTrace(); - } + processRemoveConnection(state.getInfo().getConnectionId(), 0L); + } + catch (Throwable ignore) + { + ignore.printStackTrace(); } } } @@ -1134,16 +1058,21 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor return resp; } - AMQConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) + public void addConsumerBrokerExchange(ConsumerId id, AMQSession amqSession, Map consumerMap) { AMQConsumerBrokerExchange result = consumerExchanges.get(id); if (result == null) { + if (consumerMap.size() == 1) + { + result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next()); + } + else + { + result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap); + } synchronized (consumerExchanges) { - result = new AMQConsumerBrokerExchange(); - AMQTransportConnectionState state = lookupConnectionState(id); - context = state.getContext(); result.setConnectionContext(context); SessionState ss = state.getSessionState(id.getParentId()); if (ss != null) @@ -1165,63 +1094,36 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor consumerExchanges.put(id, result); } } - return result; } - protected synchronized AMQTransportConnectionState lookupConnectionState( - ConsumerId id) - { - return connectionStateRegister.lookupConnectionState(id); - } - - protected synchronized AMQTransportConnectionState lookupConnectionState( - ProducerId id) - { - return connectionStateRegister.lookupConnectionState(id); - } - - public int getConsumerCount(ConnectionId connectionId) + public int getConsumerCount() { int result = 0; - AMQTransportConnectionState cs = lookupConnectionState(connectionId); - if (cs != null) + for (SessionId sessionId : state.getSessionIds()) { - for (SessionId sessionId : cs.getSessionIds()) + SessionState sessionState = state.getSessionState(sessionId); + if (sessionState != null) { - SessionState sessionState = cs.getSessionState(sessionId); - if (sessionState != null) - { - result += sessionState.getConsumerIds().size(); - } + result += sessionState.getConsumerIds().size(); } } return result; } - public int getProducerCount(ConnectionId connectionId) + public int getProducerCount() { int result = 0; - AMQTransportConnectionState cs = lookupConnectionState(connectionId); - if (cs != null) + for (SessionId sessionId : state.getSessionIds()) { - for (SessionId sessionId : cs.getSessionIds()) + SessionState sessionState = state.getSessionState(sessionId); + if (sessionState != null) { - SessionState sessionState = cs.getSessionState(sessionId); - if (sessionState != null) - { - result += sessionState.getProducerIds().size(); - } + result += sessionState.getProducerIds().size(); } } return result; } - public synchronized AMQTransportConnectionState lookupConnectionState( - ConnectionId connectionId) - { - return connectionStateRegister.lookupConnectionState(connectionId); - } - @Override public Response processAddDestination(DestinationInfo dest) throws Exception { @@ -1273,20 +1175,18 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public Response processAddSession(SessionInfo info) throws Exception { - ConnectionId connectionId = info.getSessionId().getParentId(); - AMQTransportConnectionState cs = lookupConnectionState(connectionId); // Avoid replaying dup commands - if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) + if (!state.getSessionIds().contains(info.getSessionId())) { protocolManager.addSession(this, info); try { - cs.addSession(info); + state.addSession(info); } catch (IllegalStateException e) { e.printStackTrace(); - protocolManager.removeSession(cs.getContext(), info); + protocolManager.removeSession(context, info); } } return null; @@ -1422,10 +1322,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor //in that case don't send the response //this will force the client to wait until //the response is got. - if (context == null) - { - this.context = new AMQConnectionContext(); - } context.setDontSendReponse(true); } else @@ -1463,9 +1359,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor synchronized (producerExchanges) { result = new AMQProducerBrokerExchange(); - AMQTransportConnectionState state = lookupConnectionState(id); - context = state.getContext(); result.setConnectionContext(context); + //todo implement reconnect https://issues.apache.org/jira/browse/ARTEMIS-194 if (context.isReconnect() || (context.isNetworkConnection() && this.acceptorUsed .isAuditNetworkProducers())) @@ -1490,20 +1385,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor producerExchanges.put(id, result); } } - else - { - context = result.getConnectionContext(); - } return result; } @Override public Response processMessageAck(MessageAck ack) throws Exception { - ConsumerId consumerId = ack.getConsumerId(); - SessionId sessionId = consumerId.getParentId(); - AMQSession session = protocolManager.getSession(sessionId); - session.acknowledge(ack); + AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); + consumerBrokerExchange.acknowledge(ack); return null; } @@ -1523,7 +1412,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public Response processMessagePull(MessagePull arg0) throws Exception { - throw new IllegalStateException("not implemented! "); + AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId()); + if (amqConsumerBrokerExchange == null) + { + throw new IllegalStateException("Consumer does not exist"); + } + amqConsumerBrokerExchange.processMessagePull(arg0); + return null; } @Override @@ -1542,8 +1437,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public Response processRecoverTransactions(TransactionInfo info) throws Exception { - AMQTransportConnectionState cs = lookupConnectionState(info.getConnectionId()); - Set sIds = cs.getSessionIds(); + Set sIds = state.getSessionIds(); TransactionId[] recovered = protocolManager.recoverTransactions(sIds); return new DataArrayResponse(recovered); } @@ -1552,48 +1446,31 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { - AMQTransportConnectionState cs = lookupConnectionState(id); - if (cs != null) + // Don't allow things to be added to the connection state while we + // are shutting down. + state.shutdown(); + // Cascade the connection stop to the sessions. + for (SessionId sessionId : state.getSessionIds()) { - // Don't allow things to be added to the connection state while we - // are shutting down. - cs.shutdown(); - // Cascade the connection stop to the sessions. - for (SessionId sessionId : cs.getSessionIds()) - { - try - { - processRemoveSession(sessionId, lastDeliveredSequenceId); - } - catch (Throwable e) - { - // LOG - } - } - try { - protocolManager.removeConnection(cs.getContext(), cs.getInfo(), - null); + processRemoveSession(sessionId, lastDeliveredSequenceId); } catch (Throwable e) { - // log - } - AMQTransportConnectionState state = unregisterConnectionState(id); - if (state != null) - { - synchronized (brokerConnectionStates) - { - // If we are the last reference, we should remove the state - // from the broker. - if (state.decrementReference() == 0) - { - brokerConnectionStates.remove(id); - } - } + // LOG } } + + try + { + protocolManager.removeConnection(context, state.getInfo(), + null); + } + catch (Throwable e) + { + // log + } return null; } @@ -1602,15 +1479,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor long lastDeliveredSequenceId) throws Exception { SessionId sessionId = id.getParentId(); - ConnectionId connectionId = sessionId.getParentId(); - AMQTransportConnectionState cs = lookupConnectionState(connectionId); - if (cs == null) - { - throw new IllegalStateException( - "Cannot remove a consumer from a connection that had not been registered: " - + connectionId); - } - SessionState ss = cs.getSessionState(sessionId); + SessionState ss = state.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException( @@ -1625,8 +1494,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } ConsumerInfo info = consumerState.getInfo(); info.setLastDeliveredSequenceId(lastDeliveredSequenceId); - protocolManager.removeConsumer(cs.getContext(), consumerState.getInfo()); + + AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id); + + consumerBrokerExchange.removeConsumer(); + removeConsumerBrokerExchange(id); + return null; } @@ -1661,15 +1535,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { - ConnectionId connectionId = id.getParentId(); - AMQTransportConnectionState cs = lookupConnectionState(connectionId); - if (cs == null) - { - throw new IllegalStateException( - "Cannot remove session from connection that had not been registered: " - + connectionId); - } - SessionState session = cs.getSessionState(id); + SessionState session = state.getSessionState(id); if (session == null) { throw new IllegalStateException( @@ -1701,8 +1567,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor // LOG.warn("Failed to remove producer: {}", producerId, e); } } - cs.removeSession(id); - protocolManager.removeSession(cs.getContext(), session.getInfo()); + state.removeSession(id); + protocolManager.removeSession(context, session.getInfo()); return null; } @@ -1790,7 +1656,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor public AMQConnectionContext getConext() { - return this.state.getContext(); + return this.context; } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 8e7d31cd4a..1a4386d914 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; import io.netty.channel.ChannelPipeline; import org.apache.activemq.advisory.AdvisorySupport; @@ -55,6 +56,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; @@ -69,7 +71,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -114,8 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager, No protected final ProducerId advisoryProducerId = new ProducerId(); // from broker - protected final Map brokerConnectionStates = Collections - .synchronizedMap(new HashMap()); + protected final Map brokerConnectionStates = Collections + .synchronizedMap(new HashMap()); private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); @@ -131,6 +132,8 @@ public class OpenWireProtocolManager implements ProtocolManager, No private Map sessionIdMap = new ConcurrentHashMap(); + private final ScheduledExecutorService scheduledPool; + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; @@ -141,6 +144,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No brokerState = new BrokerState(); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); ManagementService service = server.getManagementService(); + scheduledPool = server.getScheduledPool(); if (service != null) { service.addNotificationListener(this); @@ -239,7 +243,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No } public void handleCommand(OpenWireConnection openWireConnection, - Object command) + Object command) throws Exception { Command amqCmd = (Command) command; byte type = amqCmd.getDataStructureType(); @@ -252,6 +256,10 @@ public class OpenWireProtocolManager implements ProtocolManager, No * failover and load balancing. These features are not yet implemented for Artemis OpenWire. Instead we * simply drop the packet. See: ACTIVEMQ6-108 */ break; + case CommandTypes.MESSAGE_PULL: + MessagePull messagePull = (MessagePull) amqCmd; + openWireConnection.processMessagePull(messagePull); + break; case CommandTypes.CONSUMER_CONTROL: break; default: @@ -306,11 +314,6 @@ public class OpenWireProtocolManager implements ProtocolManager, No } } - public Map getConnectionStates() - { - return this.brokerConnectionStates; - } - public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception { String username = info.getUserName(); @@ -483,8 +486,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); - AMQTransportConnectionState cs = theConn - .lookupConnectionState(connectionId); + ConnectionState cs = theConn.getState(); if (cs == null) { throw new IllegalStateException( @@ -505,7 +507,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (theConn.getProducerCount(connectionId) >= theConn + if (theConn.getProducerCount() >= theConn .getMaximumProducersAllowedPerConnection()) { throw new IllegalStateException( @@ -541,8 +543,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No // Todo: add a destination interceptors holder here (amq supports this) SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); - AMQTransportConnectionState cs = theConn - .lookupConnectionState(connectionId); + ConnectionState cs = theConn.getState(); if (cs == null) { throw new IllegalStateException( @@ -564,7 +565,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (theConn.getConsumerCount(connectionId) >= theConn + if (theConn.getConsumerCount() >= theConn .getMaximumConsumersAllowedPerConnection()) { throw new IllegalStateException( @@ -580,17 +581,9 @@ public class OpenWireProtocolManager implements ProtocolManager, No throw new IllegalStateException("Session not exist! : " + sessionId); } - amqSession.createConsumer(info); + amqSession.createConsumer(info, amqSession); - try - { - ss.addConsumer(info); - theConn.addConsumerBrokerExchange(info.getConsumerId()); - } - catch (IllegalStateException e) - { - amqSession.removeConsumer(info); - } + ss.addConsumer(info); } } @@ -614,7 +607,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No boolean internal) { AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, - server, theConn, this); + server, theConn, scheduledPool, this); amqSession.initialize(); amqSession.setInternal(internal); sessions.put(ss.getSessionId(), amqSession); @@ -644,13 +637,6 @@ public class OpenWireProtocolManager implements ProtocolManager, No } } - public void removeConsumer(AMQConnectionContext context, ConsumerInfo info) throws Exception - { - SessionId sessionId = info.getConsumerId().getParentId(); - AMQSession session = sessions.get(sessionId); - session.removeConsumer(info); - } - public void removeProducer(ProducerId id) { SessionId sessionId = id.getParentId(); @@ -677,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No { SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName()); - ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId()); + ConnectionState state = connection.getState(); ConnectionInfo connInfo = state.getInfo(); if (connInfo != null) { @@ -849,12 +835,11 @@ public class OpenWireProtocolManager implements ProtocolManager, No { ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination); ConnectionId connId = sessionId.getParentId(); - AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId); - OpenWireConnection conn = cc.getConnection(); + OpenWireConnection cc = this.brokerConnectionStates.get(connId); ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString()); - fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId()); + fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId()); } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java new file mode 100644 index 0000000000..7fe36856c3 --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.openwire.amq; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessagePull; + +import java.util.Map; + +public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange +{ + + private final Map consumerMap; + + public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map consumerMap) + { + super(amqSession); + this.consumerMap = consumerMap; + } + + public void processMessagePull(MessagePull messagePull) throws Exception + { + AMQConsumer amqConsumer = consumerMap.get(messagePull.getDestination()); + if (amqConsumer != null) + { + amqConsumer.processMessagePull(messagePull); + } + } + + public void acknowledge(MessageAck ack) throws Exception + { + AMQConsumer amqConsumer = consumerMap.get(ack.getDestination()); + if (amqConsumer != null) + { + amqSession.acknowledge(ack, amqConsumer); + } + } + + public void removeConsumer() throws Exception + { + for (AMQConsumer amqConsumer : consumerMap.values()) + { + amqConsumer.removeConsumer(); + } + } +} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 1292aee78f..59f6d26b60 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -21,13 +21,19 @@ import java.util.Iterator; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.TransactionId; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.artemis.api.core.SimpleString; @@ -42,20 +48,28 @@ public class AMQConsumer implements BrowserListener private AMQSession session; private org.apache.activemq.command.ActiveMQDestination actualDest; private ConsumerInfo info; + private final ScheduledExecutorService scheduledPool; private long nativeId = -1; private SimpleString subQueueName = null; private final int prefetchSize; - private AtomicInteger currentSize; + private AtomicInteger windowAvailable; private final java.util.Queue deliveringRefs = new ConcurrentLinkedQueue(); + private long messagePullSequence = 0; + private MessagePullHandler messagePullHandler; - public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info) + public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info, ScheduledExecutorService scheduledPool) { this.session = amqSession; this.actualDest = d; this.info = info; + this.scheduledPool = scheduledPool; this.prefetchSize = info.getPrefetchSize(); - this.currentSize = new AtomicInteger(0); + this.windowAvailable = new AtomicInteger(prefetchSize); + if (prefetchSize == 0) + { + messagePullHandler = new MessagePullHandler(); + } } public void init() throws Exception @@ -130,12 +144,12 @@ public class AMQConsumer implements BrowserListener coreSession.createQueue(address, subQueueName, selector, true, false); } - coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, Integer.MAX_VALUE); + coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1); } else { SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName()); - coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, Integer.MAX_VALUE); + coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); } if (info.isBrowser()) @@ -163,23 +177,14 @@ public class AMQConsumer implements BrowserListener public void acquireCredit(int n) throws Exception { - this.currentSize.addAndGet(-n); - if (currentSize.get() < prefetchSize) + boolean promptDelivery = windowAvailable.get() == 0; + if (windowAvailable.get() < prefetchSize) { - AtomicInteger credits = session.getCoreSession().getConsumerCredits(nativeId); - credits.set(0); - session.getCoreSession().receiveConsumerCredits(nativeId, Integer.MAX_VALUE); + this.windowAvailable.addAndGet(n); } - } - - public void checkCreditOnDelivery() throws Exception - { - this.currentSize.incrementAndGet(); - - if (currentSize.get() == prefetchSize) + if (promptDelivery) { - //stop because reach prefetchSize - session.getCoreSession().receiveConsumerCredits(nativeId, 0); + session.getCoreSession().promptDelivery(nativeId); } } @@ -188,12 +193,16 @@ public class AMQConsumer implements BrowserListener MessageDispatch dispatch; try { + if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) + { + return 0; + } //decrement deliveryCount as AMQ client tends to add 1. dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); int size = dispatch.getMessage().getSize(); this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size)); session.deliverMessage(dispatch); - checkCreditOnDelivery(); + windowAvailable.decrementAndGet(); return size; } catch (IOException e) @@ -206,6 +215,16 @@ public class AMQConsumer implements BrowserListener } } + public void handleDeliverNullDispatch() + { + MessageDispatch md = new MessageDispatch(); + md.setConsumerId(getId()); + md.setDestination(actualDest); + session.deliverMessage(md); + windowAvailable.decrementAndGet(); + } + + public void acknowledge(MessageAck ack) throws Exception { MessageId first = ack.getFirstMessageId(); @@ -400,4 +419,90 @@ public class AMQConsumer implements BrowserListener { return info; } + + public boolean hasCredits() + { + return windowAvailable.get() > 0; + } + + public void processMessagePull(MessagePull messagePull) throws Exception + { + windowAvailable.incrementAndGet(); + + if (messagePullHandler != null) + { + messagePullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout()); + } + } + + public void removeConsumer() throws Exception + { + session.removeConsumer(nativeId); + } + + private class MessagePullHandler + { + private long next = -1; + private long timeout; + private CountDownLatch latch = new CountDownLatch(1); + private ScheduledFuture messagePullFuture; + + public void nextSequence(long next, long timeout) throws Exception + { + this.next = next; + this.timeout = timeout; + latch = new CountDownLatch(1); + session.getCoreSession().forceConsumerDelivery(nativeId, messagePullSequence); + //if we are 0 timeout or less we need to wait to get either the forced message or a real message. + if (timeout <= 0) + { + latch.await(10, TimeUnit.SECONDS); + //this means we have received no message just the forced delivery message + if (this.next >= 0) + { + handleDeliverNullDispatch(); + } + } + } + + public boolean checkForcedConsumer(ServerMessage message) + { + if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) + { + System.out.println("MessagePullHandler.checkForcedConsumer"); + if (next >= 0) + { + if (timeout <= 0) + { + latch.countDown(); + } + else + { + messagePullFuture = scheduledPool.schedule(new Runnable() + { + @Override + public void run() + { + if (next >= 0) + { + handleDeliverNullDispatch(); + } + } + }, timeout, TimeUnit.MILLISECONDS); + } + } + return false; + } + else + { + next = -1; + if (messagePullFuture != null) + { + messagePullFuture.cancel(true); + } + latch.countDown(); + return true; + } + } + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java index fd629641e4..168f557c15 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java @@ -16,13 +16,22 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; -public class AMQConsumerBrokerExchange +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessagePull; + +public abstract class AMQConsumerBrokerExchange { + protected final AMQSession amqSession; private AMQConnectionContext connectionContext; private AMQDestination regionDestination; private AMQSubscription subscription; private boolean wildcard; + public AMQConsumerBrokerExchange(AMQSession amqSession) + { + this.amqSession = amqSession; + } + /** * @return the connectionContext */ @@ -90,4 +99,10 @@ public class AMQConsumerBrokerExchange { this.wildcard = wildcard; } + + public abstract void acknowledge(MessageAck ack) throws Exception; + + public abstract void processMessagePull(MessagePull messagePull) throws Exception; + + public abstract void removeConsumer() throws Exception; } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java deleted file mode 100644 index 4fc32463bb..0000000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.SessionId; - -public class AMQMapTransportConnectionStateRegister implements - AMQTransportConnectionStateRegister -{ - - private Map connectionStates = new ConcurrentHashMap(); - - public AMQTransportConnectionState registerConnectionState( - ConnectionId connectionId, AMQTransportConnectionState state) - { - AMQTransportConnectionState rc = connectionStates - .put(connectionId, state); - return rc; - } - - public AMQTransportConnectionState unregisterConnectionState( - ConnectionId connectionId) - { - AMQTransportConnectionState rc = connectionStates.remove(connectionId); - if (rc.getReferenceCounter().get() > 1) - { - rc.decrementReference(); - connectionStates.put(connectionId, rc); - } - return rc; - } - - public List listConnectionStates() - { - - List rc = new ArrayList(); - rc.addAll(connectionStates.values()); - return rc; - } - - public AMQTransportConnectionState lookupConnectionState(String connectionId) - { - return connectionStates.get(new ConnectionId(connectionId)); - } - - public AMQTransportConnectionState lookupConnectionState(ConsumerId id) - { - AMQTransportConnectionState cs = lookupConnectionState(id - .getConnectionId()); - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a consumer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; - } - - public AMQTransportConnectionState lookupConnectionState(ProducerId id) - { - AMQTransportConnectionState cs = lookupConnectionState(id - .getConnectionId()); - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a producer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; - } - - public AMQTransportConnectionState lookupConnectionState(SessionId id) - { - AMQTransportConnectionState cs = lookupConnectionState(id - .getConnectionId()); - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a session from a connection that had not been registered: " - + id.getParentId()); - } - return cs; - } - - public AMQTransportConnectionState lookupConnectionState( - ConnectionId connectionId) - { - AMQTransportConnectionState cs = connectionStates.get(connectionId); - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a connection that had not been registered: " - + connectionId); - } - return cs; - } - - public boolean doesHandleMultipleConnectionStates() - { - return true; - } - - public boolean isEmpty() - { - return connectionStates.isEmpty(); - } - - public void clear() - { - connectionStates.clear(); - - } - - public void intialize(AMQTransportConnectionStateRegister other) - { - connectionStates.clear(); - connectionStates.putAll(other.mapStates()); - - } - - public Map mapStates() - { - HashMap map = new HashMap( - connectionStates); - return map; - } - -} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index ef64b6c230..2c24903339 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Message; @@ -69,12 +68,10 @@ public class AMQSession implements SessionCallback private SessionInfo sessInfo; private ActiveMQServer server; private OpenWireConnection connection; - //native id -> consumer - private Map consumers = new ConcurrentHashMap(); - //amq id -> native id - private Map consumerIdMap = new HashMap(); - private Map producers = new HashMap(); + private Map consumers = new ConcurrentHashMap<>(); + + private Map producers = new HashMap<>(); private AtomicBoolean started = new AtomicBoolean(false); @@ -82,15 +79,18 @@ public class AMQSession implements SessionCallback private boolean isTx; + private final ScheduledExecutorService scheduledPool; + private OpenWireProtocolManager manager; public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, - ActiveMQServer server, OpenWireConnection connection, OpenWireProtocolManager manager) + ActiveMQServer server, OpenWireConnection connection, ScheduledExecutorService scheduledPool, OpenWireProtocolManager manager) { this.connInfo = connInfo; this.sessInfo = sessInfo; this.server = server; this.connection = connection; + this.scheduledPool = scheduledPool; this.manager = manager; } @@ -123,7 +123,7 @@ public class AMQSession implements SessionCallback } - public void createConsumer(ConsumerInfo info) throws Exception + public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception { //check destination ActiveMQDestination dest = info.getDestination(); @@ -136,7 +136,7 @@ public class AMQSession implements SessionCallback { dests = new ActiveMQDestination[] {dest}; } - + Map consumerMap = new HashMap<>(); for (ActiveMQDestination d : dests) { if (d.isQueue()) @@ -144,11 +144,13 @@ public class AMQSession implements SessionCallback SimpleString queueName = OpenWireUtil.toCoreAddress(d); getCoreServer().getJMSQueueCreator().create(queueName); } - AMQConsumer consumer = new AMQConsumer(this, d, info); + AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool); consumer.init(); + consumerMap.put(d, consumer); consumers.put(consumer.getNativeId(), consumer); - this.consumerIdMap.put(info.getConsumerId().getValue(), consumer.getNativeId()); } + connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap); + coreSession.start(); started.set(true); } @@ -214,7 +216,8 @@ public class AMQSession implements SessionCallback @Override public boolean hasCredits(ServerConsumer consumerID) { - return true; + AMQConsumer amqConsumer = consumers.get(consumerID.getID()); + return amqConsumer.hasCredits(); } @Override @@ -234,19 +237,12 @@ public class AMQSession implements SessionCallback return this.server; } - public void removeConsumer(ConsumerInfo info) throws Exception + public void removeConsumer(long consumerId) throws Exception { - long consumerId = info.getConsumerId().getValue(); - long nativeId = this.consumerIdMap.remove(consumerId); - if (this.txId != null || this.isTx) - { - ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, false); - } - else - { - ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, true); - } - AMQConsumer consumer = consumers.remove(nativeId); + boolean failed = !(this.txId != null || this.isTx); + + coreSession.amqCloseConsumer(consumerId, failed); + consumers.remove(consumerId); } public void createProducer(ProducerInfo info) throws Exception @@ -331,16 +327,13 @@ public class AMQSession implements SessionCallback return this.connection.getMarshaller(); } - public void acknowledge(MessageAck ack) throws Exception + public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception { TransactionId tid = ack.getTransactionId(); if (tid != null) { this.resetSessionTx(ack.getTransactionId()); } - ConsumerId consumerId = ack.getConsumerId(); - long nativeConsumerId = consumerIdMap.get(consumerId.getValue()); - AMQConsumer consumer = consumers.get(nativeConsumerId); consumer.acknowledge(ack); if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java new file mode 100644 index 0000000000..9a1d4f8179 --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.openwire.amq; + +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessagePull; + +public class AMQSingleConsumerBrokerExchange extends AMQConsumerBrokerExchange +{ + private AMQConsumer consumer; + + public AMQSingleConsumerBrokerExchange(AMQSession amqSession, AMQConsumer consumer) + { + super(amqSession); + this.consumer = consumer; + } + + public void processMessagePull(MessagePull messagePull) throws Exception + { + consumer.processMessagePull(messagePull); + } + + public void removeConsumer() throws Exception + { + consumer.removeConsumer(); + } + + public void acknowledge(MessageAck ack) throws Exception + { + amqSession.acknowledge(ack, consumer); + } +} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java deleted file mode 100644 index 93d1591a72..0000000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.SessionId; - -/** - * We just copy this structure from amq, but what's the purpose - * and can it be removed ? - */ -public class AMQSingleTransportConnectionStateRegister implements - AMQTransportConnectionStateRegister -{ - - private AMQTransportConnectionState connectionState; - private ConnectionId connectionId; - - public AMQTransportConnectionState registerConnectionState( - ConnectionId connectionId, AMQTransportConnectionState state) - { - AMQTransportConnectionState rc = connectionState; - connectionState = state; - this.connectionId = connectionId; - return rc; - } - - public synchronized AMQTransportConnectionState unregisterConnectionState( - ConnectionId connectionId) - { - AMQTransportConnectionState rc = null; - - if (connectionId != null && connectionState != null - && this.connectionId != null) - { - if (this.connectionId.equals(connectionId)) - { - rc = connectionState; - connectionState = null; - connectionId = null; - } - } - return rc; - } - - public synchronized List listConnectionStates() - { - List rc = new ArrayList(); - if (connectionState != null) - { - rc.add(connectionState); - } - return rc; - } - - public synchronized AMQTransportConnectionState lookupConnectionState( - String connectionId) - { - AMQTransportConnectionState cs = connectionState; - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a connectionId for a connection that had not been registered: " - + connectionId); - } - return cs; - } - - public synchronized AMQTransportConnectionState lookupConnectionState( - ConsumerId id) - { - AMQTransportConnectionState cs = connectionState; - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a consumer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; - } - - public synchronized AMQTransportConnectionState lookupConnectionState( - ProducerId id) - { - AMQTransportConnectionState cs = connectionState; - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a producer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; - } - - public synchronized AMQTransportConnectionState lookupConnectionState( - SessionId id) - { - AMQTransportConnectionState cs = connectionState; - if (cs == null) - { - throw new IllegalStateException( - "Cannot lookup a session from a connection that had not been registered: " - + id.getParentId()); - } - return cs; - } - - public synchronized AMQTransportConnectionState lookupConnectionState( - ConnectionId connectionId) - { - AMQTransportConnectionState cs = connectionState; - return cs; - } - - public synchronized boolean doesHandleMultipleConnectionStates() - { - return false; - } - - public synchronized boolean isEmpty() - { - return connectionState == null; - } - - public void intialize(AMQTransportConnectionStateRegister other) - { - - if (other.isEmpty()) - { - clear(); - } - else - { - Map map = other.mapStates(); - Iterator i = map.entrySet().iterator(); - Map.Entry entry = (Entry) i - .next(); - connectionId = entry.getKey(); - connectionState = entry.getValue(); - } - - } - - public Map mapStates() - { - Map map = new HashMap(); - if (!isEmpty()) - { - map.put(connectionId, connectionState); - } - return map; - } - - public void clear() - { - connectionState = null; - connectionId = null; - - } - -} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionState.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionState.java deleted file mode 100644 index a8e5973807..0000000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionState.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.state.ConnectionState; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; - -public class AMQTransportConnectionState extends ConnectionState -{ - - private AMQConnectionContext context; - private OpenWireConnection connection; - private AtomicInteger referenceCounter = new AtomicInteger(); - private final Object connectionMutex = new Object(); - - public AMQTransportConnectionState(ConnectionInfo info, - OpenWireConnection transportConnection) - { - super(info); - connection = transportConnection; - } - - public AMQConnectionContext getContext() - { - return context; - } - - public OpenWireConnection getConnection() - { - return connection; - } - - public void setContext(AMQConnectionContext context) - { - this.context = context; - } - - public void setConnection(OpenWireConnection connection) - { - this.connection = connection; - } - - public int incrementReference() - { - return referenceCounter.incrementAndGet(); - } - - public int decrementReference() - { - return referenceCounter.decrementAndGet(); - } - - public AtomicInteger getReferenceCounter() - { - return referenceCounter; - } - - public void setReferenceCounter(AtomicInteger referenceCounter) - { - this.referenceCounter = referenceCounter; - } - - public Object getConnectionMutex() - { - return connectionMutex; - } - -} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java deleted file mode 100644 index 642f05bf21..0000000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.util.List; -import java.util.Map; - -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.SessionId; - -/** - * What's the purpose of this? - */ -public interface AMQTransportConnectionStateRegister -{ - AMQTransportConnectionState registerConnectionState(ConnectionId connectionId, - AMQTransportConnectionState state); - - AMQTransportConnectionState unregisterConnectionState(ConnectionId connectionId); - - List listConnectionStates(); - - Map mapStates(); - - AMQTransportConnectionState lookupConnectionState(String connectionId); - - AMQTransportConnectionState lookupConnectionState(ConsumerId id); - - AMQTransportConnectionState lookupConnectionState(ProducerId id); - - AMQTransportConnectionState lookupConnectionState(SessionId id); - - AMQTransportConnectionState lookupConnectionState(ConnectionId connectionId); - - boolean isEmpty(); - - boolean doesHandleMultipleConnectionStates(); - - void intialize(AMQTransportConnectionStateRegister other); - - void clear(); - -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 49a39261bb..118b7dffb6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -788,6 +788,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener } } + + public void promptDelivery(long consumerID) + { + ServerConsumer consumer = consumers.get(consumerID); + + // this would be possible if the server consumer was closed by pings/pongs.. etc + if (consumer != null) + { + consumer.promptDelivery(); + } + } + public void acknowledge(final long consumerID, final long messageID) throws Exception { ServerConsumer consumer = consumers.get(consumerID);