This closes #779

This commit is contained in:
Clebert Suconic 2016-09-16 11:07:31 -04:00
commit 1f5f45ca9b
11 changed files with 88 additions and 87 deletions

View File

@ -104,7 +104,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor());
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
if (server.getConfiguration().getConnectionTTLOverride() != -1) {

View File

@ -16,26 +16,29 @@
*/
package org.apache.activemq.artemis.core.protocol.proton.plug;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.jboss.logging.Logger;
@ -46,13 +49,14 @@ import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.handler.ExtCapability;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.PlainSASLResult;
import static org.proton.plug.AmqpSupport.CONTAINER_ID;
import static org.proton.plug.AmqpSupport.INVALID_FIELD;
import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback {
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener {
private static final List<String> connectedContainers = Collections.synchronizedList(new ArrayList());
private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
private final ProtonProtocolManager manager;
@ -67,14 +71,20 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
private final Executor closeExecutor;
private ServerSession internalSession;
private String remoteContainerId;
private AtomicBoolean registeredConnectionId = new AtomicBoolean(false);
private ActiveMQServer server;
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
Connection connection,
Executor closeExecutor) {
Executor closeExecutor,
ActiveMQServer server) {
this.manager = manager;
this.connection = connection;
this.closeExecutor = closeExecutor;
this.server = server;
}
@Override
@ -105,42 +115,10 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
return supportsAnonymous;
}
@Override
public void init() throws Exception {
//This internal core session is used to represent the connection
//in core server. It is used to identify unique clientIDs.
//Note the Qpid-JMS client does create a initial session
//for each connection. However is comes in as a Begin
//After Open. This makes it unusable for this purpose
//as we need to decide the uniqueness in response to
//Open, and the checking Uniqueness and adding the unique
//client-id to server need to be atomic.
if (internalSession == null) {
SASLResult saslResult = amqpConnection.getSASLResult();
String user = null;
String passcode = null;
if (saslResult != null) {
user = saslResult.getUser();
if (saslResult instanceof PlainSASLResult) {
passcode = ((PlainSASLResult) saslResult).getPassword();
}
}
internalSession = manager.getServer().createSession(createInternalSessionName(), user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // RemotingConnection remotingConnection,
false,
false,
false,
false,
null, (SessionCallback) createSessionCallback(this.amqpConnection), true);
}
}
@Override
public void close() {
try {
internalSession.close(false);
}
catch (Exception e) {
log.error("error closing internal session", e);
if (registeredConnectionId.getAndSet(false)) {
server.removeClientConnection(remoteContainerId);
}
connection.close();
amqpConnection.close();
@ -170,6 +148,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
}
public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) {
this.protonConnectionDelegate = protonConnectionDelegate;
}
@ -209,25 +188,35 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
@Override
public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
String remote = connection.getRemoteContainer();
if (ExtCapability.needUniqueConnection(connection)) {
if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, remote)) {
//https://issues.apache.org/jira/browse/ARTEMIS-728
Map<Symbol, Object> connProp = new HashMap<>();
connProp.put(CONNECTION_OPEN_FAILED, "true");
connection.setProperties(connProp);
connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
Map<Symbol, Symbol> info = new HashMap<>();
info.put(INVALID_FIELD, CONTAINER_ID);
connection.getCondition().setInfo(info);
return false;
}
remoteContainerId = connection.getRemoteContainer();
boolean idOK = server.addClientConnection(remoteContainerId, ExtCapability.needUniqueConnection(connection));
if (!idOK) {
//https://issues.apache.org/jira/browse/ARTEMIS-728
Map<Symbol, Object> connProp = new HashMap<>();
connProp.put(CONNECTION_OPEN_FAILED, "true");
connection.setProperties(connProp);
connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
Map<Symbol, Symbol> info = new HashMap<>();
info.put(INVALID_FIELD, CONTAINER_ID);
connection.getCondition().setInfo(info);
return false;
}
registeredConnectionId.set(true);
return true;
}
private String createInternalSessionName() {
return "amqp:" + UUIDGenerator.getInstance().generateStringUUID();
@Override
public void connectionClosed() {
close();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
close();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
close();
}
}

View File

@ -21,8 +21,6 @@ import org.apache.qpid.proton.engine.Connection;
public interface AMQPConnectionCallback {
void init() throws Exception;
void close();
/**

View File

@ -63,11 +63,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
return connectionCallback.validateConnection(connection, handler.getSASLResult());
}
@Override
protected void initInternal() throws Exception {
connectionCallback.init();
}
@Override
protected void remoteLinkOpened(Link link) throws Exception {

View File

@ -72,10 +72,6 @@ public class AbstractConnectionContextTest {
private class TestConnectionCallback implements AMQPConnectionCallback {
@Override
public void init() throws Exception {
}
@Override
public void close() {

View File

@ -37,6 +37,8 @@ import org.proton.plug.util.ByteUtil;
public class ProtonINVMSPI implements AMQPConnectionCallback {
private static final Logger log = Logger.getLogger(ProtonINVMSPI.class);
AMQPConnectionContext returningConnection;
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
@ -60,10 +62,6 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
});
}
@Override
public void init() throws Exception {
}
@Override
public void close() {
mainExecutor.shutdown();
@ -136,10 +134,6 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
class ReturnSPI implements AMQPConnectionCallback {
@Override
public void init() throws Exception {
}
@Override
public void close() {

View File

@ -54,10 +54,6 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
return connection;
}
@Override
public void init() throws Exception {
}
@Override
public void close() {

View File

@ -50,10 +50,6 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@Override
public void init() throws Exception {
}
@Override
public void close() {
executorService.shutdown();

View File

@ -397,4 +397,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
void setMBeanServer(MBeanServer mBeanServer);
void addExternalComponent(ActiveMQComponent externalComponent);
boolean addClientConnection(String clientId, boolean unique);
void removeClientConnection(String clientId);
}

View File

@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
@ -309,6 +310,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private Date startDate;
private final List<ActiveMQComponent> externalComponents = new ArrayList<>();
private final Map<String, AtomicInteger> connectedClientIds = new ConcurrentHashMap();
// Constructors
// ---------------------------------------------------------------------------------
@ -2396,6 +2400,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return new Date().getTime() - startDate.getTime();
}
public boolean addClientConnection(String clientId, boolean unique) {
final AtomicInteger i = connectedClientIds.putIfAbsent(clientId, new AtomicInteger(1));
if (i != null) {
if (unique && i.get() != 0) {
return false;
}
else if (i.incrementAndGet() > 0) {
connectedClientIds.put(clientId, i);
}
}
return true;
}
public void removeClientConnection(String clientId) {
AtomicInteger i = connectedClientIds.get(clientId);
if (i != null && i.decrementAndGet() == 0) {
connectedClientIds.remove(clientId);
}
}
private final class ActivationThread extends Thread {
final Runnable runnable;

View File

@ -1561,6 +1561,16 @@ public class ProtonTest extends ProtonTestBase {
testConn2.close();
}
try {
testConn1 = createConnection(false);
testConn2 = createConnection(false);
testConn1.setClientID("client-id1");
testConn2.setClientID("client-id2");
}
finally {
testConn1.close();
testConn2.close();
}
}
private javax.jms.Queue createQueue(String address) throws Exception {