This closes #769

This commit is contained in:
Clebert Suconic 2016-09-15 11:17:03 -04:00
commit 646a891988
13 changed files with 258 additions and 25 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.proton.plug;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -23,19 +25,35 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.handler.ExtCapability;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.PlainSASLResult;
import static org.proton.plug.AmqpSupport.CONTAINER_ID;
import static org.proton.plug.AmqpSupport.INVALID_FIELD;
import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback {
private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
private final ProtonProtocolManager manager;
@ -49,6 +67,8 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
private final Executor closeExecutor;
private ServerSession internalSession;
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
Connection connection,
Executor closeExecutor) {
@ -85,8 +105,43 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
return supportsAnonymous;
}
@Override
public void init() throws Exception {
//This internal core session is used to represent the connection
//in core server. It is used to identify unique clientIDs.
//Note the Qpid-JMS client does create a initial session
//for each connection. However is comes in as a Begin
//After Open. This makes it unusable for this purpose
//as we need to decide the uniqueness in response to
//Open, and the checking Uniqueness and adding the unique
//client-id to server need to be atomic.
if (internalSession == null) {
SASLResult saslResult = amqpConnection.getSASLResult();
String user = null;
String passcode = null;
if (saslResult != null) {
user = saslResult.getUser();
if (saslResult instanceof PlainSASLResult) {
passcode = ((PlainSASLResult) saslResult).getPassword();
}
}
internalSession = manager.getServer().createSession(createInternalSessionName(), user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // RemotingConnection remotingConnection,
false,
false,
false,
false,
null, (SessionCallback) createSessionCallback(this.amqpConnection), true);
}
}
@Override
public void close() {
try {
internalSession.close(false);
}
catch (Exception e) {
log.error("error closing internal session", e);
}
connection.close();
amqpConnection.close();
}
@ -151,4 +206,28 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
public void sendSASLSupported() {
connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
}
@Override
public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
String remote = connection.getRemoteContainer();
if (ExtCapability.needUniqueConnection(connection)) {
if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, remote)) {
//https://issues.apache.org/jira/browse/ARTEMIS-728
Map<Symbol, Object> connProp = new HashMap<>();
connProp.put(CONNECTION_OPEN_FAILED, "true");
connection.setProperties(connProp);
connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
Map<Symbol, Symbol> info = new HashMap<>();
info.put(INVALID_FIELD, CONTAINER_ID);
connection.getCondition().setInfo(info);
return false;
}
}
return true;
}
private String createInternalSessionName() {
return "amqp:" + UUIDGenerator.getInstance().generateStringUUID();
}
}

View File

@ -17,9 +17,12 @@
package org.proton.plug;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.engine.Connection;
public interface AMQPConnectionCallback {
void init() throws Exception;
void close();
/**
@ -41,4 +44,6 @@ public interface AMQPConnectionCallback {
boolean isSupportsAnonymous();
void sendSASLSupported();
boolean validateConnection(Connection connection, SASLResult saslResult);
}

View File

@ -60,6 +60,7 @@ public class AmqpSupport {
// Lifetime policy symbols
public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
/**
* Search for a given Symbol in a given array of Symbol object.
*

View File

@ -188,6 +188,13 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
return null;
}
protected boolean validateConnection(Connection connection) {
return true;
}
protected void initInternal() throws Exception {
}
// This listener will perform a bunch of things here
class LocalListener extends DefaultEventHandler {
@ -213,13 +220,25 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
@Override
public void onRemoteOpen(Connection connection) throws Exception {
synchronized (getLock()) {
connection.setContext(AbstractConnectionContext.this);
connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
try {
initInternal();
}
catch (Exception e) {
log.error("Error init connection", e);
}
if (!validateConnection(connection)) {
connection.close();
}
else {
connection.setContext(AbstractConnectionContext.this);
connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
}
initialise();
/*
* This can be null which is in effect an empty map, also we really dont need to check this for in bound connections
* but its here in case we add support for outbound connections.

View File

@ -39,7 +39,7 @@ public class ProtonInitializable {
public void initialise() throws Exception {
if (!initialized) {
initialized = false;
initialized = true;
try {
if (afterInit != null) {
afterInit.run();

View File

@ -16,10 +16,9 @@
*/
package org.proton.plug.context.server;
import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
@ -30,6 +29,7 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.handler.ExtCapability;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -58,6 +58,16 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
return protonSession;
}
@Override
protected boolean validateConnection(Connection connection) {
return connectionCallback.validateConnection(connection, handler.getSASLResult());
}
@Override
protected void initInternal() throws Exception {
connectionCallback.init();
}
@Override
protected void remoteLinkOpened(Link link) throws Exception {
@ -84,6 +94,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
@Override
public Symbol[] getConnectionCapabilitiesOffered() {
return new Symbol[]{DELAYED_DELIVERY};
return ExtCapability.getCapabilities();
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.proton.plug.handler;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection;
import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
import static org.proton.plug.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
public class ExtCapability {
public static final Symbol[] capabilities = new Symbol[] {
SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY
};
public static Symbol[] getCapabilities() {
return capabilities;
}
public static boolean needUniqueConnection(Connection connection) {
Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities();
if (extCapabilities != null) {
for (Symbol sym : extCapabilities) {
if (sym.compareTo(SOLE_CONNECTION_CAPABILITY) == 0) {
return true;
}
}
}
return false;
}
}

View File

@ -386,5 +386,4 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
}
}
}

View File

@ -28,6 +28,7 @@ import org.junit.Test;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.handler.EventHandler;
@ -71,6 +72,10 @@ public class AbstractConnectionContextTest {
private class TestConnectionCallback implements AMQPConnectionCallback {
@Override
public void init() throws Exception {
}
@Override
public void close() {
@ -110,5 +115,10 @@ public class AbstractConnectionContextTest {
public void sendSASLSupported() {
}
@Override
public boolean validateConnection(Connection connection, SASLResult saslResult) {
return true;
}
}
}

View File

@ -21,10 +21,12 @@ import java.util.concurrent.Executors;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.context.server.ProtonServerConnectionContext;
import org.proton.plug.sasl.AnonymousServerSASL;
@ -58,6 +60,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
});
}
@Override
public void init() throws Exception {
}
@Override
public void close() {
mainExecutor.shutdown();
@ -78,6 +84,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
}
@Override
public boolean validateConnection(Connection connection, SASLResult saslResult) {
return true;
}
@Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
if (log.isTraceEnabled()) {
@ -125,6 +136,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
class ReturnSPI implements AMQPConnectionCallback {
@Override
public void init() throws Exception {
}
@Override
public void close() {
@ -145,6 +160,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
}
@Override
public boolean validateConnection(Connection connection, SASLResult saslResult) {
return true;
}
@Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {

View File

@ -22,10 +22,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
@ -52,6 +54,10 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
return connection;
}
@Override
public void init() throws Exception {
}
@Override
public void close() {
@ -72,6 +78,11 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
}
@Override
public boolean validateConnection(Connection connection, SASLResult saslResult) {
return true;
}
final ReusableLatch latch = new ReusableLatch(0);
@Override

View File

@ -25,10 +25,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
@ -48,6 +50,10 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@Override
public void init() throws Exception {
}
@Override
public void close() {
executorService.shutdown();
@ -80,6 +86,11 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
}
@Override
public boolean validateConnection(Connection connection, SASLResult saslResult) {
return true;
}
@Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
final int bufferSize = bytes.writerIndex();

View File

@ -27,6 +27,7 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
@ -1533,6 +1534,35 @@ public class ProtonTest extends ProtonTestBase {
connection.close();
}
@Test
public void testClientID() throws Exception {
Connection testConn1 = createConnection(false);
Connection testConn2 = createConnection(false);
try {
testConn1.setClientID("client-id1");
try {
testConn1.setClientID("client-id2");
fail("didn't get expected exception");
}
catch (javax.jms.IllegalStateException e) {
//expected
}
try {
testConn2.setClientID("client-id1");
fail("didn't get expected exception");
}
catch (InvalidClientIDException e) {
//expected
}
}
finally {
testConn1.close();
testConn2.close();
}
}
private javax.jms.Queue createQueue(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
@ -1543,29 +1573,19 @@ public class ProtonTest extends ProtonTestBase {
}
}
private javax.jms.Connection createConnection() throws JMSException {
private Connection createConnection() throws JMSException {
return this.createConnection(true);
}
private javax.jms.Connection createConnection(boolean isStart) throws JMSException {
Connection connection;
if (protocol == 3) {
factory = new JmsConnectionFactory(amqpConnectionUri);
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
}
else if (protocol == 0) {
factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
}
else {
TransportConfiguration transport;
@ -1579,6 +1599,8 @@ public class ProtonTest extends ProtonTestBase {
}
connection = factory.createConnection(userName, password);
}
if (isStart) {
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {