diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index 3567307888..a2563a17dc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -104,7 +104,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { - ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor()); + ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server); long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL; if (server.getConfiguration().getConnectionTTLOverride() != -1) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java index 707b312892..ea66b01f59 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java @@ -16,26 +16,29 @@ */ package org.apache.activemq.artemis.core.protocol.proton.plug; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL; -import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; +import org.apache.activemq.artemis.core.remoting.CloseListener; +import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.ReusableLatch; -import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.jboss.logging.Logger; @@ -46,13 +49,14 @@ import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; import org.proton.plug.handler.ExtCapability; import org.proton.plug.sasl.AnonymousServerSASL; -import org.proton.plug.sasl.PlainSASLResult; import static org.proton.plug.AmqpSupport.CONTAINER_ID; import static org.proton.plug.AmqpSupport.INVALID_FIELD; import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED; -public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback { +public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener { + private static final List connectedContainers = Collections.synchronizedList(new ArrayList()); + private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class); private final ProtonProtocolManager manager; @@ -67,14 +71,20 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback private final Executor closeExecutor; - private ServerSession internalSession; + private String remoteContainerId; + + private AtomicBoolean registeredConnectionId = new AtomicBoolean(false); + + private ActiveMQServer server; public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection, - Executor closeExecutor) { + Executor closeExecutor, + ActiveMQServer server) { this.manager = manager; this.connection = connection; this.closeExecutor = closeExecutor; + this.server = server; } @Override @@ -105,42 +115,10 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback return supportsAnonymous; } - @Override - public void init() throws Exception { - //This internal core session is used to represent the connection - //in core server. It is used to identify unique clientIDs. - //Note the Qpid-JMS client does create a initial session - //for each connection. However is comes in as a Begin - //After Open. This makes it unusable for this purpose - //as we need to decide the uniqueness in response to - //Open, and the checking Uniqueness and adding the unique - //client-id to server need to be atomic. - if (internalSession == null) { - SASLResult saslResult = amqpConnection.getSASLResult(); - String user = null; - String passcode = null; - if (saslResult != null) { - user = saslResult.getUser(); - if (saslResult instanceof PlainSASLResult) { - passcode = ((PlainSASLResult) saslResult).getPassword(); - } - } - internalSession = manager.getServer().createSession(createInternalSessionName(), user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // RemotingConnection remotingConnection, - false, - false, - false, - false, - null, (SessionCallback) createSessionCallback(this.amqpConnection), true); - } - } - @Override public void close() { - try { - internalSession.close(false); - } - catch (Exception e) { - log.error("error closing internal session", e); + if (registeredConnectionId.getAndSet(false)) { + server.removeClientConnection(remoteContainerId); } connection.close(); amqpConnection.close(); @@ -170,6 +148,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback } public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) { + this.protonConnectionDelegate = protonConnectionDelegate; } @@ -209,25 +188,35 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback @Override public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) { - String remote = connection.getRemoteContainer(); - - if (ExtCapability.needUniqueConnection(connection)) { - if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, remote)) { - //https://issues.apache.org/jira/browse/ARTEMIS-728 - Map connProp = new HashMap<>(); - connProp.put(CONNECTION_OPEN_FAILED, "true"); - connection.setProperties(connProp); - connection.getCondition().setCondition(AmqpError.INVALID_FIELD); - Map info = new HashMap<>(); - info.put(INVALID_FIELD, CONTAINER_ID); - connection.getCondition().setInfo(info); - return false; - } + remoteContainerId = connection.getRemoteContainer(); + boolean idOK = server.addClientConnection(remoteContainerId, ExtCapability.needUniqueConnection(connection)); + if (!idOK) { + //https://issues.apache.org/jira/browse/ARTEMIS-728 + Map connProp = new HashMap<>(); + connProp.put(CONNECTION_OPEN_FAILED, "true"); + connection.setProperties(connProp); + connection.getCondition().setCondition(AmqpError.INVALID_FIELD); + Map info = new HashMap<>(); + info.put(INVALID_FIELD, CONTAINER_ID); + connection.getCondition().setInfo(info); + return false; } + registeredConnectionId.set(true); return true; } - private String createInternalSessionName() { - return "amqp:" + UUIDGenerator.getInstance().generateStringUUID(); + @Override + public void connectionClosed() { + close(); + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + close(); + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + close(); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java index df14b0f929..15a3246fec 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java @@ -21,8 +21,6 @@ import org.apache.qpid.proton.engine.Connection; public interface AMQPConnectionCallback { - void init() throws Exception; - void close(); /** diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java index efaaed4ec0..3386732b71 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -63,11 +63,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp return connectionCallback.validateConnection(connection, handler.getSASLResult()); } - @Override - protected void initInternal() throws Exception { - connectionCallback.init(); - } - @Override protected void remoteLinkOpened(Link link) throws Exception { diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java index 91af8f5106..da7b61734f 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java @@ -72,10 +72,6 @@ public class AbstractConnectionContextTest { private class TestConnectionCallback implements AMQPConnectionCallback { - @Override - public void init() throws Exception { - } - @Override public void close() { diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java index bf83f8a71b..5de6e9d76d 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java @@ -37,6 +37,8 @@ import org.proton.plug.util.ByteUtil; public class ProtonINVMSPI implements AMQPConnectionCallback { private static final Logger log = Logger.getLogger(ProtonINVMSPI.class); + + AMQPConnectionContext returningConnection; ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); @@ -60,10 +62,6 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { }); } - @Override - public void init() throws Exception { - } - @Override public void close() { mainExecutor.shutdown(); @@ -136,10 +134,6 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { class ReturnSPI implements AMQPConnectionCallback { - @Override - public void init() throws Exception { - } - @Override public void close() { diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java index fbdee59f03..be1571cc66 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java @@ -54,10 +54,6 @@ public class AMQPClientSPI implements AMQPConnectionCallback { return connection; } - @Override - public void init() throws Exception { - } - @Override public void close() { diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java index 055b29deff..1b9c9195ae 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java @@ -50,10 +50,6 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback { ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); - @Override - public void init() throws Exception { - } - @Override public void close() { executorService.shutdown(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 0842c0d824..ac653359f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -397,4 +397,8 @@ public interface ActiveMQServer extends ActiveMQComponent { void setMBeanServer(MBeanServer mBeanServer); void addExternalComponent(ActiveMQComponent externalComponent); + + boolean addClientConnection(String clientId, boolean unique); + + void removeClientConnection(String clientId); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index f5b9f26c49..7e32f43284 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Pair; @@ -309,6 +310,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { private Date startDate; private final List externalComponents = new ArrayList<>(); + + private final Map connectedClientIds = new ConcurrentHashMap(); + // Constructors // --------------------------------------------------------------------------------- @@ -2396,6 +2400,25 @@ public class ActiveMQServerImpl implements ActiveMQServer { return new Date().getTime() - startDate.getTime(); } + public boolean addClientConnection(String clientId, boolean unique) { + final AtomicInteger i = connectedClientIds.putIfAbsent(clientId, new AtomicInteger(1)); + if (i != null) { + if (unique && i.get() != 0) { + return false; + } + else if (i.incrementAndGet() > 0) { + connectedClientIds.put(clientId, i); + } + } + return true; + } + + public void removeClientConnection(String clientId) { + AtomicInteger i = connectedClientIds.get(clientId); + if (i != null && i.decrementAndGet() == 0) { + connectedClientIds.remove(clientId); + } + } private final class ActivationThread extends Thread { final Runnable runnable; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 245c6b9275..193b46b9be 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -1561,6 +1561,16 @@ public class ProtonTest extends ProtonTestBase { testConn2.close(); } + try { + testConn1 = createConnection(false); + testConn2 = createConnection(false); + testConn1.setClientID("client-id1"); + testConn2.setClientID("client-id2"); + } + finally { + testConn1.close(); + testConn2.close(); + } } private javax.jms.Queue createQueue(String address) throws Exception {