This commit is contained in:
Clebert Suconic 2020-10-29 21:55:38 -04:00
commit caddd4fb2f
6 changed files with 41 additions and 28 deletions

View File

@ -243,6 +243,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
} }
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory); ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection; protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler())); connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler()));
@ -304,7 +305,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
} }
private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) { private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) {
// TODO implement this as part of https://issues.apache.org/jira/browse/ARTEMIS-2965
} }
/** The reason this method is static is the following: /** The reason this method is static is the following:
@ -498,6 +499,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
@Override @Override
public void connectionDestroyed(Object connectionID) { public void connectionDestroyed(Object connectionID) {
server.getRemotingService().removeConnection(connectionID);
redoConnection(); redoConnection();
} }

View File

@ -70,7 +70,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) { for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) {
NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true); NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, null, null, null); protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, config.getTransportConfigurations().get(0).getExtraParams(), null, null);
NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(config.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager)); NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(config.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
bridgesConnector.start(); bridgesConnector.start();

View File

@ -25,9 +25,11 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
public interface RemotingService { public interface RemotingService {
@ -123,4 +125,6 @@ public interface RemotingService {
void destroyAcceptor(String name) throws Exception; void destroyAcceptor(String name) throws Exception;
void loadProtocolServices(List<ActiveMQComponent> protocolServices); void loadProtocolServices(List<ActiveMQComponent> protocolServices);
void addConnectionEntry(Connection connection, ConnectionEntry entry);
} }

View File

@ -562,11 +562,16 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
logger.trace("Connection created " + connection); logger.trace("Connection created " + connection);
} }
connections.put(connection.getID(), entry); addConnectionEntry(connection, entry);
connectionCountLatch.countUp(); connectionCountLatch.countUp();
totalConnectionCount.incrementAndGet(); totalConnectionCount.incrementAndGet();
} }
@Override
public void addConnectionEntry(Connection connection, ConnectionEntry entry) {
connections.put(connection.getID(), entry);
}
@Override @Override
public void connectionDestroyed(final Object connectionID) { public void connectionDestroyed(final Object connectionID) {

View File

@ -54,17 +54,6 @@ public class AMQPBridgeTest extends AmqpClientTestSupport {
return createServer(AMQP_PORT, false); return createServer(AMQP_PORT, false);
} }
@Test
public void testsSimpleConnect() throws Exception {
server.start();
server_2 = createServer(AMQP_PORT_2, false);
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.start();
}
@Test @Test
public void testSimpleTransferPush() throws Exception { public void testSimpleTransferPush() throws Exception {
internalTransferPush("TEST", false); internalTransferPush("TEST", false);

View File

@ -67,7 +67,9 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
@Override @Override
protected ActiveMQServer createServer() throws Exception { protected ActiveMQServer createServer() throws Exception {
return createServer(AMQP_PORT, false); ActiveMQServer server = createServer(AMQP_PORT, false);
server.getConfiguration().setNetworkCheckPeriod(100);
return server;
} }
@Before @Before
@ -81,25 +83,35 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
qpidProcess.kill(); qpidProcess.kill();
} }
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueueKill() throws Exception {
internalMultipleQueues(true, true, true);
}
@Test(timeout = 60_000) @Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueue() throws Exception { public void testWithMatchingDifferentNamesOnQueue() throws Exception {
internalMultipleQueues(true, true); internalMultipleQueues(true, true, false);
} }
@Test(timeout = 60_000) @Test(timeout = 60_000)
public void testWithMatching() throws Exception { public void testWithMatching() throws Exception {
internalMultipleQueues(true, false); internalMultipleQueues(true, false, false);
} }
@Test(timeout = 60_000) @Test(timeout = 60_000)
public void testwithQueueName() throws Exception { public void testwithQueueName() throws Exception {
internalMultipleQueues(false, true); internalMultipleQueues(false, false, false);
} }
private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception { @Test(timeout = 60_000)
public void testwithQueueNameDistinctName() throws Exception {
internalMultipleQueues(false, true, false);
}
private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill) throws Exception {
final int numberOfMessages = 100; final int numberOfMessages = 100;
final int numberOfQueues = 10; final int numberOfQueues = 10;
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622").setRetryInterval(10).setReconnectAttempts(-1); AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1);
if (useMatching) { if (useMatching) {
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER)); amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER));
} else { } else {
@ -118,7 +130,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
ConnectionFactory factoryProducer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"); ConnectionFactory factoryProducer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
Connection connection = null; Connection connection = null;
connection = createConnectionDumbRetry(factoryProducer, connection); connection = createConnectionDumbRetry(factoryProducer);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue.test" + dest); Queue queue = session.createQueue("queue.test" + dest);
@ -135,10 +147,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
if (kill) {
stopQpidRouter();
startQpidRouter();
}
for (int dest = 0; dest < numberOfQueues; dest++) { for (int dest = 0; dest < numberOfQueues; dest++) {
ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"); ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
Connection connectionConsumer = factoryConsumer.createConnection(); Connection connectionConsumer = createConnectionDumbRetry(factoryConsumer);
Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueConsumer = sessionConsumer.createQueue("queue.test" + dest); Queue queueConsumer = sessionConsumer.createQueue("queue.test" + dest);
MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer); MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer);
@ -167,7 +183,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming)); org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming));
Wait.assertEquals(0, testQueueOnServer::getMessageCount); Wait.assertEquals(0, testQueueOnServer::getMessageCount);
} }
} }
private String createQueueName(int i, boolean useDistinctName) { private String createQueueName(int i, boolean useDistinctName) {
@ -178,18 +193,16 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
} }
} }
private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer, private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer) throws InterruptedException {
Connection connection) throws InterruptedException {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
try { try {
// Some retry // Some retry
connection = factoryProducer.createConnection(); return factoryProducer.createConnection();
break;
} catch (Exception e) { } catch (Exception e) {
Thread.sleep(10); Thread.sleep(10);
} }
} }
return connection; return null;
} }
} }