ARTEMIS-814: Add support for outgoing AMQP connections
This commit is contained in:
parent
6e5b917cc5
commit
e65fd5d674
|
@ -0,0 +1,107 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.protocol.amqp.broker;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages the lifecycle of a proton client connection.
|
||||||
|
*/
|
||||||
|
public class ProtonClientConnectionLifeCycleListener implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
|
||||||
|
private final Map<Object, ConnectionEntry> connectionMap = new ConcurrentHashMap<>();
|
||||||
|
private final ActiveMQServer server;
|
||||||
|
private final int ttl;
|
||||||
|
private static final Logger log = Logger.getLogger(ProtonClientConnectionLifeCycleListener.class);
|
||||||
|
|
||||||
|
public ProtonClientConnectionLifeCycleListener(ActiveMQServer server, int ttl) {
|
||||||
|
this.server = server;
|
||||||
|
this.ttl = ttl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
|
||||||
|
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
|
||||||
|
|
||||||
|
String id = server.getConfiguration().getName();
|
||||||
|
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
|
||||||
|
Executor executor = server.getExecutorFactory().getExecutor();
|
||||||
|
amqpConnection.open();
|
||||||
|
|
||||||
|
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
|
||||||
|
|
||||||
|
connectionCallback.setProtonConnectionDelegate(delegate);
|
||||||
|
|
||||||
|
ConnectionEntry connectionEntry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
|
||||||
|
connectionMap.put(connection.getID(), connectionEntry);
|
||||||
|
log.info("Connection " + connection.getRemoteAddress() + " created");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionDestroyed(Object connectionID) {
|
||||||
|
ConnectionEntry connection = connectionMap.remove(connectionID);
|
||||||
|
if (connection != null) {
|
||||||
|
log.info("Connection " + connection.connection.getRemoteAddress() + " destroyed");
|
||||||
|
connection.connection.disconnect(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionException(Object connectionID, ActiveMQException me) {
|
||||||
|
ConnectionEntry connection = connectionMap.get(connectionID);
|
||||||
|
if (connection != null) {
|
||||||
|
log.info("Connection " + connection.connection.getRemoteAddress() + " exception: " + me.getMessage());
|
||||||
|
connection.connection.fail(me);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
||||||
|
ConnectionEntry connection = connectionMap.get(connectionID);
|
||||||
|
if (connection != null) {
|
||||||
|
log.info("Connection " + connection.connection.getRemoteAddress() + " ready");
|
||||||
|
connection.connection.getTransportConnection().fireReady(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
for (ConnectionEntry entry : connectionMap.values()) {
|
||||||
|
entry.connection.disconnect(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
||||||
|
ConnectionEntry entry = connectionMap.get(connectionID);
|
||||||
|
if (entry != null) {
|
||||||
|
entry.connection.bufferReceived(connectionID, buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.protocol.amqp.broker;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles proton protocol management for clients, mapping the {@link ProtonProtocolManager} to the {@link org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager} API.
|
||||||
|
*
|
||||||
|
* TODO: Find a common base for ProtocolManager and ClientProtocolManager.
|
||||||
|
*/
|
||||||
|
public class ProtonClientProtocolManager extends ProtonProtocolManager implements ClientProtocolManager {
|
||||||
|
|
||||||
|
public ProtonClientProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
|
||||||
|
super(factory, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemotingConnection getCurrentConnection() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Lock lockSessionCreation() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean waitOnLatch(long milliseconds) throws InterruptedException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAlive() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addChannelHandlers(ChannelPipeline pipeline) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendSubscribeTopology(boolean isServer) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void ping(long connectionTTL) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SessionContext createSessionContext(String name, String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean cleanupBeforeFailover(ActiveMQException cause) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkForFailover(String liveNodeID) throws ActiveMQException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setSessionFactory(ClientSessionFactory factory) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientSessionFactory getSessionFactory() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
|
@ -205,6 +205,10 @@ public class AMQPConnectionContext extends ProtonInitializable {
|
||||||
return ExtCapability.getCapabilities();
|
return ExtCapability.getCapabilities();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void open() {
|
||||||
|
handler.open(containerId);
|
||||||
|
}
|
||||||
|
|
||||||
// This listener will perform a bunch of things here
|
// This listener will perform a bunch of things here
|
||||||
class LocalListener implements EventHandler {
|
class LocalListener implements EventHandler {
|
||||||
|
|
||||||
|
|
|
@ -357,4 +357,11 @@ public class ProtonHandler extends ProtonInitializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void open(String containerId) {
|
||||||
|
this.transport.open();
|
||||||
|
this.connection.setContainer(containerId);
|
||||||
|
this.connection.open();
|
||||||
|
flush();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -62,9 +63,15 @@ import javax.jms.TopicSubscriber;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonClientConnectionLifeCycleListener;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonClientProtocolManager;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
@ -949,6 +956,54 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOutboundConnection() throws Throwable {
|
||||||
|
final ActiveMQServer remote = createAMQPServer(5673);
|
||||||
|
remote.start();
|
||||||
|
try {
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return remote.isActive();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
remote.stop();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, Object> config = new LinkedHashMap<>();
|
||||||
|
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
||||||
|
config.put(TransportConstants.PORT_PROP_NAME, "5673");
|
||||||
|
ProtonClientConnectionLifeCycleListener lifeCycleListener = new ProtonClientConnectionLifeCycleListener(server, 5000);
|
||||||
|
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
|
||||||
|
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
|
||||||
|
connector.start();
|
||||||
|
connector.createConnection();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return remote.getConnectionCount() > 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals(1, remote.getConnectionCount());
|
||||||
|
|
||||||
|
lifeCycleListener.stop();
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return remote.getConnectionCount() == 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals(0, remote.getConnectionCount());
|
||||||
|
} finally {
|
||||||
|
lifeCycleListener.stop();
|
||||||
|
remote.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// Uncomment testLoopBrowser to validate the hunging on the test
|
// Uncomment testLoopBrowser to validate the hunging on the test
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -42,23 +43,27 @@ public class ProtonTestBase extends ActiveMQTestBase {
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
|
|
||||||
server = this.createServer(true, true);
|
server = this.createAMQPServer(5672);
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ActiveMQServer createAMQPServer(int port) throws Exception {
|
||||||
|
final ActiveMQServer amqpServer = this.createServer(true, true);
|
||||||
HashMap<String, Object> params = new HashMap<>();
|
HashMap<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.PORT_PROP_NAME, "5672");
|
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
|
||||||
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
|
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
|
||||||
HashMap<String, Object> amqpParams = new HashMap<>();
|
HashMap<String, Object> amqpParams = new HashMap<>();
|
||||||
configureAmqp(amqpParams);
|
configureAmqp(amqpParams);
|
||||||
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
|
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
|
||||||
|
|
||||||
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
|
amqpServer.getConfiguration().setAcceptorConfigurations(Collections.singleton(transportConfiguration));
|
||||||
server.getConfiguration().setName(brokerName);
|
amqpServer.getConfiguration().setName(brokerName);
|
||||||
|
|
||||||
// Default Page
|
// Default Page
|
||||||
AddressSettings addressSettings = new AddressSettings();
|
AddressSettings addressSettings = new AddressSettings();
|
||||||
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
||||||
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
|
amqpServer.getConfiguration().getAddressesSettings().put("#", addressSettings);
|
||||||
|
return amqpServer;
|
||||||
server.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void configureAmqp(Map<String, Object> params) {
|
protected void configureAmqp(Map<String, Object> params) {
|
||||||
|
|
Loading…
Reference in New Issue