ARTEMIS-814: Refactor client connection and allow adding custom event handlers
This commit is contained in:
parent
28645b1b83
commit
52a462d155
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.protocol.amqp.client;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connection factory for outgoing AMQP connections.
|
||||||
|
*/
|
||||||
|
public class AMQPClientConnectionFactory {
|
||||||
|
|
||||||
|
private final ActiveMQServer server;
|
||||||
|
private final String containerId;
|
||||||
|
private final int ttl;
|
||||||
|
|
||||||
|
public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, int ttl) {
|
||||||
|
this.server = server;
|
||||||
|
this.containerId = containerId;
|
||||||
|
this.ttl = ttl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) {
|
||||||
|
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
|
||||||
|
|
||||||
|
Executor executor = server.getExecutorFactory().getExecutor();
|
||||||
|
|
||||||
|
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool());
|
||||||
|
eventHandler.ifPresent(amqpConnection::addEventHandler);
|
||||||
|
|
||||||
|
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
|
||||||
|
|
||||||
|
connectionCallback.setProtonConnectionDelegate(delegate);
|
||||||
|
|
||||||
|
amqpConnection.open();
|
||||||
|
return delegate;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,110 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.protocol.amqp.client;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|
||||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
|
|
||||||
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
|
|
||||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
|
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
|
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
|
||||||
import org.jboss.logging.Logger;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manages the lifecycle of a proton client connection.
|
|
||||||
*/
|
|
||||||
public class ProtonClientConnectionLifeCycleListener implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
|
|
||||||
private final Map<Object, ConnectionEntry> connectionMap = new ConcurrentHashMap<>();
|
|
||||||
private final ActiveMQServer server;
|
|
||||||
private final int ttl;
|
|
||||||
private static final Logger log = Logger.getLogger(ProtonClientConnectionLifeCycleListener.class);
|
|
||||||
|
|
||||||
public ProtonClientConnectionLifeCycleListener(ActiveMQServer server, int ttl) {
|
|
||||||
this.server = server;
|
|
||||||
this.ttl = ttl;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
|
|
||||||
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
|
|
||||||
|
|
||||||
String id = server.getConfiguration().getName();
|
|
||||||
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
|
|
||||||
Executor executor = server.getExecutorFactory().getExecutor();
|
|
||||||
amqpConnection.open();
|
|
||||||
|
|
||||||
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
|
|
||||||
|
|
||||||
connectionCallback.setProtonConnectionDelegate(delegate);
|
|
||||||
|
|
||||||
ConnectionEntry connectionEntry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
|
|
||||||
connectionMap.put(connection.getID(), connectionEntry);
|
|
||||||
log.info("Connection " + connection.getRemoteAddress() + " created");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connectionDestroyed(Object connectionID) {
|
|
||||||
ConnectionEntry connection = connectionMap.remove(connectionID);
|
|
||||||
if (connection != null) {
|
|
||||||
log.info("Connection " + connection.connection.getRemoteAddress() + " destroyed");
|
|
||||||
connection.connection.disconnect(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connectionException(Object connectionID, ActiveMQException me) {
|
|
||||||
ConnectionEntry connection = connectionMap.get(connectionID);
|
|
||||||
if (connection != null) {
|
|
||||||
log.info("Connection " + connection.connection.getRemoteAddress() + " exception: " + me.getMessage());
|
|
||||||
connection.connection.fail(me);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
|
||||||
ConnectionEntry connection = connectionMap.get(connectionID);
|
|
||||||
if (connection != null) {
|
|
||||||
log.info("Connection " + connection.connection.getRemoteAddress() + " ready");
|
|
||||||
connection.connection.getTransportConnection().fireReady(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
for (ConnectionEntry entry : connectionMap.values()) {
|
|
||||||
entry.connection.disconnect(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
|
||||||
ConnectionEntry entry = connectionMap.get(connectionID);
|
|
||||||
if (entry != null) {
|
|
||||||
entry.connection.bufferReceived(connectionID, buffer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.protocol.amqp.client;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages the lifecycle of a proton client connection.
|
||||||
|
*/
|
||||||
|
public class ProtonClientConnectionManager implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
|
||||||
|
private final Map<Object, ActiveMQProtonRemotingConnection> connectionMap = new ConcurrentHashMap<>();
|
||||||
|
private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class);
|
||||||
|
private final AMQPClientConnectionFactory connectionFactory;
|
||||||
|
private final Optional<EventHandler> eventHandler;
|
||||||
|
|
||||||
|
public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler) {
|
||||||
|
this.connectionFactory = connectionFactory;
|
||||||
|
this.eventHandler = eventHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
|
||||||
|
ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler);
|
||||||
|
connectionMap.put(connection.getID(), amqpConnection);
|
||||||
|
|
||||||
|
log.info("Connection " + amqpConnection.getRemoteAddress() + " created");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionDestroyed(Object connectionID) {
|
||||||
|
RemotingConnection connection = connectionMap.remove(connectionID);
|
||||||
|
if (connection != null) {
|
||||||
|
log.info("Connection " + connection.getRemoteAddress() + " destroyed");
|
||||||
|
connection.disconnect(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionException(Object connectionID, ActiveMQException me) {
|
||||||
|
RemotingConnection connection = connectionMap.get(connectionID);
|
||||||
|
if (connection != null) {
|
||||||
|
log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage());
|
||||||
|
connection.fail(me);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
||||||
|
RemotingConnection connection = connectionMap.get(connectionID);
|
||||||
|
if (connection != null) {
|
||||||
|
log.info("Connection " + connection.getRemoteAddress() + " ready");
|
||||||
|
connection.getTransportConnection().fireReady(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
for (RemotingConnection connection : connectionMap.values()) {
|
||||||
|
connection.disconnect(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
||||||
|
RemotingConnection connection = connectionMap.get(connectionID);
|
||||||
|
if (connection != null) {
|
||||||
|
connection.bufferReceived(connectionID, buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -209,6 +209,14 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
||||||
handler.open(containerId);
|
handler.open(containerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getContainer() {
|
||||||
|
return containerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addEventHandler(EventHandler eventHandler) {
|
||||||
|
handler.addEventHandler(eventHandler);
|
||||||
|
}
|
||||||
|
|
||||||
// This listener will perform a bunch of things here
|
// This listener will perform a bunch of things here
|
||||||
class LocalListener implements EventHandler {
|
class LocalListener implements EventHandler {
|
||||||
|
|
||||||
|
|
|
@ -166,7 +166,9 @@ public class AMQPSessionContext extends ProtonInitializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addSender(Sender sender) throws Exception {
|
public void addSender(Sender sender) throws Exception {
|
||||||
ProtonServerSenderContext protonSender = new ProtonServerSenderContext(connection, sender, this, sessionSPI);
|
// TODO: Remove this check when we have support for global link names
|
||||||
|
boolean outgoing = (sender.getContext() != null && sender.getContext().equals(true));
|
||||||
|
ProtonServerSenderContext protonSender = outgoing ? new ProtonClientSenderContext(connection, sender, this, sessionSPI) : new ProtonServerSenderContext(connection, sender, this, sessionSPI);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
protonSender.initialise();
|
protonSender.initialise();
|
||||||
|
@ -205,5 +207,4 @@ public class AMQPSessionContext extends ProtonInitializable {
|
||||||
receiver.close();
|
receiver.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.protocol.amqp.proton;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||||
|
import org.apache.qpid.proton.engine.Sender;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO: Merge {@link ProtonServerSenderContext} and {@link ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
|
||||||
|
*/
|
||||||
|
public class ProtonClientSenderContext extends ProtonServerSenderContext {
|
||||||
|
public ProtonClientSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext amqpSessionContext, AMQPSessionCallback sessionSPI) {
|
||||||
|
super(connection, sender, amqpSessionContext, sessionSPI);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getClientId() {
|
||||||
|
return connection.getContainer();
|
||||||
|
}
|
||||||
|
}
|
|
@ -56,6 +56,9 @@ import org.jboss.logging.Logger;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.PooledByteBufAllocator;
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO: Merge {@link ProtonServerSenderContext} and {@link ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
|
||||||
|
*/
|
||||||
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
|
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
|
||||||
|
|
||||||
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
|
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
|
||||||
|
@ -167,7 +170,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
if (source == null) {
|
if (source == null) {
|
||||||
// Attempt to recover a previous subscription happens when a link reattach happens on a
|
// Attempt to recover a previous subscription happens when a link reattach happens on a
|
||||||
// subscription queue
|
// subscription queue
|
||||||
String clientId = connection.getRemoteContainer();
|
String clientId = getClientId();
|
||||||
String pubId = sender.getName();
|
String pubId = sender.getName();
|
||||||
queue = createQueueName(clientId, pubId);
|
queue = createQueueName(clientId, pubId);
|
||||||
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
|
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
|
||||||
|
@ -232,7 +235,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
// if we are a subscription and durable create a durable queue using the container
|
// if we are a subscription and durable create a durable queue using the container
|
||||||
// id and link name
|
// id and link name
|
||||||
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
|
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
|
||||||
String clientId = connection.getRemoteContainer();
|
String clientId = getClientId();
|
||||||
String pubId = sender.getName();
|
String pubId = sender.getName();
|
||||||
queue = createQueueName(clientId, pubId);
|
queue = createQueueName(clientId, pubId);
|
||||||
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
|
QueueQueryResult result = sessionSPI.queueQuery(queue, false);
|
||||||
|
@ -295,6 +298,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String getClientId() {
|
||||||
|
return connection.getRemoteContainer();
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isPubSub(Source source) {
|
private boolean isPubSub(Source source) {
|
||||||
String pubSubPrefix = sessionSPI.getPubSubPrefix();
|
String pubSubPrefix = sessionSPI.getPubSubPrefix();
|
||||||
return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
|
return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
|
||||||
|
@ -337,7 +344,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
if (result.isExists() && source.getDynamic()) {
|
if (result.isExists() && source.getDynamic()) {
|
||||||
sessionSPI.deleteQueue(queueName);
|
sessionSPI.deleteQueue(queueName);
|
||||||
} else {
|
} else {
|
||||||
String clientId = connection.getRemoteContainer();
|
String clientId = getClientId();
|
||||||
String pubId = sender.getName();
|
String pubId = sender.getName();
|
||||||
String queue = createQueueName(clientId, pubId);
|
String queue = createQueueName(clientId, pubId);
|
||||||
result = sessionSPI.queueQuery(queue, false);
|
result = sessionSPI.queueQuery(queue, false);
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Collection;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -69,7 +70,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionLifeCycleListener;
|
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
|
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||||
|
@ -975,7 +977,7 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
final Map<String, Object> config = new LinkedHashMap<>();
|
final Map<String, Object> config = new LinkedHashMap<>();
|
||||||
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
||||||
config.put(TransportConstants.PORT_PROP_NAME, "5673");
|
config.put(TransportConstants.PORT_PROP_NAME, "5673");
|
||||||
ProtonClientConnectionLifeCycleListener lifeCycleListener = new ProtonClientConnectionLifeCycleListener(server, 5000);
|
ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, server.getConfiguration().getName(), 5000), Optional.empty());
|
||||||
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
|
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
|
||||||
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
|
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
|
||||||
connector.start();
|
connector.start();
|
||||||
|
|
Loading…
Reference in New Issue