ARTEMIS-814: Support specifying connection properties

This commit is contained in:
Ulf Lilleengen 2016-11-07 12:30:39 +01:00
parent 3ead28f587
commit 00340c86e0
4 changed files with 13 additions and 6 deletions

View File

@ -24,7 +24,9 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.Symbol;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
@ -35,11 +37,13 @@ public class AMQPClientConnectionFactory {
private final ActiveMQServer server;
private final String containerId;
private final Map<Symbol, Object> connectionProperties;
private final int ttl;
public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, int ttl) {
public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, Map<Symbol, Object> connectionProperties, int ttl) {
this.server = server;
this.containerId = containerId;
this.connectionProperties = connectionProperties;
this.ttl = ttl;
}
@ -55,7 +59,7 @@ public class AMQPClientConnectionFactory {
connectionCallback.setProtonConnectionDelegate(delegate);
amqpConnection.open();
amqpConnection.open(connectionProperties);
return delegate;
}
}

View File

@ -205,8 +205,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
return ExtCapability.getCapabilities();
}
public void open() {
handler.open(containerId);
public void open(Map<Symbol, Object> connectionProperties) {
handler.open(containerId, connectionProperties);
}
public String getContainer() {

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
@ -358,9 +359,10 @@ public class ProtonHandler extends ProtonInitializable {
}
public void open(String containerId) {
public void open(String containerId, Map<Symbol, Object> connectionProperties) {
this.transport.open();
this.connection.setContainer(containerId);
this.connection.setProperties(connectionProperties);
this.connection.open();
flush();
}

View File

@ -28,6 +28,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;
@ -977,7 +978,7 @@ public class ProtonTest extends ProtonTestBase {
final Map<String, Object> config = new LinkedHashMap<>();
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
config.put(TransportConstants.PORT_PROP_NAME, "5673");
ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, server.getConfiguration().getName(), 5000), Optional.empty());
ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty());
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
connector.start();