Add Unique ClientID on Server

This commit is contained in:
Martyn Taylor 2016-09-15 14:15:54 +01:00
parent adeaa66a1b
commit 74742dcb66
11 changed files with 88 additions and 87 deletions

View File

@ -104,7 +104,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
@Override @Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor()); ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL; long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
if (server.getConfiguration().getConnectionTTLOverride() != -1) { if (server.getConfiguration().getConnectionTTLOverride() != -1) {

View File

@ -16,26 +16,29 @@
*/ */
package org.apache.activemq.artemis.core.protocol.proton.plug; package org.apache.activemq.artemis.core.protocol.proton.plug;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL; import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -46,13 +49,14 @@ import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL; import org.proton.plug.ServerSASL;
import org.proton.plug.handler.ExtCapability; import org.proton.plug.handler.ExtCapability;
import org.proton.plug.sasl.AnonymousServerSASL; import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.PlainSASLResult;
import static org.proton.plug.AmqpSupport.CONTAINER_ID; import static org.proton.plug.AmqpSupport.CONTAINER_ID;
import static org.proton.plug.AmqpSupport.INVALID_FIELD; import static org.proton.plug.AmqpSupport.INVALID_FIELD;
import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED; import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback { public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener {
private static final List<String> connectedContainers = Collections.synchronizedList(new ArrayList());
private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class); private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
private final ProtonProtocolManager manager; private final ProtonProtocolManager manager;
@ -67,14 +71,20 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
private final Executor closeExecutor; private final Executor closeExecutor;
private ServerSession internalSession; private String remoteContainerId;
private AtomicBoolean registeredConnectionId = new AtomicBoolean(false);
private ActiveMQServer server;
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
Connection connection, Connection connection,
Executor closeExecutor) { Executor closeExecutor,
ActiveMQServer server) {
this.manager = manager; this.manager = manager;
this.connection = connection; this.connection = connection;
this.closeExecutor = closeExecutor; this.closeExecutor = closeExecutor;
this.server = server;
} }
@Override @Override
@ -105,42 +115,10 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
return supportsAnonymous; return supportsAnonymous;
} }
@Override
public void init() throws Exception {
//This internal core session is used to represent the connection
//in core server. It is used to identify unique clientIDs.
//Note the Qpid-JMS client does create a initial session
//for each connection. However is comes in as a Begin
//After Open. This makes it unusable for this purpose
//as we need to decide the uniqueness in response to
//Open, and the checking Uniqueness and adding the unique
//client-id to server need to be atomic.
if (internalSession == null) {
SASLResult saslResult = amqpConnection.getSASLResult();
String user = null;
String passcode = null;
if (saslResult != null) {
user = saslResult.getUser();
if (saslResult instanceof PlainSASLResult) {
passcode = ((PlainSASLResult) saslResult).getPassword();
}
}
internalSession = manager.getServer().createSession(createInternalSessionName(), user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // RemotingConnection remotingConnection,
false,
false,
false,
false,
null, (SessionCallback) createSessionCallback(this.amqpConnection), true);
}
}
@Override @Override
public void close() { public void close() {
try { if (registeredConnectionId.getAndSet(false)) {
internalSession.close(false); server.removeClientConnection(remoteContainerId);
}
catch (Exception e) {
log.error("error closing internal session", e);
} }
connection.close(); connection.close();
amqpConnection.close(); amqpConnection.close();
@ -170,6 +148,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
} }
public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) { public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) {
this.protonConnectionDelegate = protonConnectionDelegate; this.protonConnectionDelegate = protonConnectionDelegate;
} }
@ -209,25 +188,35 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
@Override @Override
public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) { public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
String remote = connection.getRemoteContainer(); remoteContainerId = connection.getRemoteContainer();
boolean idOK = server.addClientConnection(remoteContainerId, ExtCapability.needUniqueConnection(connection));
if (ExtCapability.needUniqueConnection(connection)) { if (!idOK) {
if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, remote)) { //https://issues.apache.org/jira/browse/ARTEMIS-728
//https://issues.apache.org/jira/browse/ARTEMIS-728 Map<Symbol, Object> connProp = new HashMap<>();
Map<Symbol, Object> connProp = new HashMap<>(); connProp.put(CONNECTION_OPEN_FAILED, "true");
connProp.put(CONNECTION_OPEN_FAILED, "true"); connection.setProperties(connProp);
connection.setProperties(connProp); connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
connection.getCondition().setCondition(AmqpError.INVALID_FIELD); Map<Symbol, Symbol> info = new HashMap<>();
Map<Symbol, Symbol> info = new HashMap<>(); info.put(INVALID_FIELD, CONTAINER_ID);
info.put(INVALID_FIELD, CONTAINER_ID); connection.getCondition().setInfo(info);
connection.getCondition().setInfo(info); return false;
return false;
}
} }
registeredConnectionId.set(true);
return true; return true;
} }
private String createInternalSessionName() { @Override
return "amqp:" + UUIDGenerator.getInstance().generateStringUUID(); public void connectionClosed() {
close();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
close();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
close();
} }
} }

View File

@ -21,8 +21,6 @@ import org.apache.qpid.proton.engine.Connection;
public interface AMQPConnectionCallback { public interface AMQPConnectionCallback {
void init() throws Exception;
void close(); void close();
/** /**

View File

@ -63,11 +63,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
return connectionCallback.validateConnection(connection, handler.getSASLResult()); return connectionCallback.validateConnection(connection, handler.getSASLResult());
} }
@Override
protected void initInternal() throws Exception {
connectionCallback.init();
}
@Override @Override
protected void remoteLinkOpened(Link link) throws Exception { protected void remoteLinkOpened(Link link) throws Exception {

View File

@ -72,10 +72,6 @@ public class AbstractConnectionContextTest {
private class TestConnectionCallback implements AMQPConnectionCallback { private class TestConnectionCallback implements AMQPConnectionCallback {
@Override
public void init() throws Exception {
}
@Override @Override
public void close() { public void close() {

View File

@ -37,6 +37,8 @@ import org.proton.plug.util.ByteUtil;
public class ProtonINVMSPI implements AMQPConnectionCallback { public class ProtonINVMSPI implements AMQPConnectionCallback {
private static final Logger log = Logger.getLogger(ProtonINVMSPI.class); private static final Logger log = Logger.getLogger(ProtonINVMSPI.class);
AMQPConnectionContext returningConnection; AMQPConnectionContext returningConnection;
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
@ -60,10 +62,6 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
}); });
} }
@Override
public void init() throws Exception {
}
@Override @Override
public void close() { public void close() {
mainExecutor.shutdown(); mainExecutor.shutdown();
@ -136,10 +134,6 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
class ReturnSPI implements AMQPConnectionCallback { class ReturnSPI implements AMQPConnectionCallback {
@Override
public void init() throws Exception {
}
@Override @Override
public void close() { public void close() {

View File

@ -54,10 +54,6 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
return connection; return connection;
} }
@Override
public void init() throws Exception {
}
@Override @Override
public void close() { public void close() {

View File

@ -50,10 +50,6 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@Override
public void init() throws Exception {
}
@Override @Override
public void close() { public void close() {
executorService.shutdown(); executorService.shutdown();

View File

@ -397,4 +397,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
void setMBeanServer(MBeanServer mBeanServer); void setMBeanServer(MBeanServer mBeanServer);
void addExternalComponent(ActiveMQComponent externalComponent); void addExternalComponent(ActiveMQComponent externalComponent);
boolean addClientConnection(String clientId, boolean unique);
void removeClientConnection(String clientId);
} }

View File

@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
@ -309,6 +310,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private Date startDate; private Date startDate;
private final List<ActiveMQComponent> externalComponents = new ArrayList<>(); private final List<ActiveMQComponent> externalComponents = new ArrayList<>();
private final Map<String, AtomicInteger> connectedClientIds = new ConcurrentHashMap();
// Constructors // Constructors
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------
@ -2396,6 +2400,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return new Date().getTime() - startDate.getTime(); return new Date().getTime() - startDate.getTime();
} }
public boolean addClientConnection(String clientId, boolean unique) {
final AtomicInteger i = connectedClientIds.putIfAbsent(clientId, new AtomicInteger(1));
if (i != null) {
if (unique && i.get() != 0) {
return false;
}
else if (i.incrementAndGet() > 0) {
connectedClientIds.put(clientId, i);
}
}
return true;
}
public void removeClientConnection(String clientId) {
AtomicInteger i = connectedClientIds.get(clientId);
if (i != null && i.decrementAndGet() == 0) {
connectedClientIds.remove(clientId);
}
}
private final class ActivationThread extends Thread { private final class ActivationThread extends Thread {
final Runnable runnable; final Runnable runnable;

View File

@ -1561,6 +1561,16 @@ public class ProtonTest extends ProtonTestBase {
testConn2.close(); testConn2.close();
} }
try {
testConn1 = createConnection(false);
testConn2 = createConnection(false);
testConn1.setClientID("client-id1");
testConn2.setClientID("client-id2");
}
finally {
testConn1.close();
testConn2.close();
}
} }
private javax.jms.Queue createQueue(String address) throws Exception { private javax.jms.Queue createQueue(String address) throws Exception {