From bd77a536c6f368b8ac5a01386d30143114185bed Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 1 Apr 2020 10:51:18 -0400 Subject: [PATCH] ARTEMIS-2685 Not Block Netty Thread in any way for OpenWire --- .../activemq/artemis/util/ServerUtil.java | 2 +- .../protocol/openwire/OpenWireConnection.java | 26 ++- .../smoke/replicationflow/SoakPagingTest.java | 151 +++++++++++++++--- 3 files changed, 152 insertions(+), 27 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java index a80dc38b3f..4812d68b02 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java @@ -109,7 +109,7 @@ public class ServerUtil { System.out.println("**********************************"); System.out.println("Killing server " + server); System.out.println("**********************************"); - server.destroy(); + server.destroyForcibly(); server.waitFor(); Thread.sleep(1000); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 9654a3e9d0..4509210806 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -83,6 +83,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.actors.Actor; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -191,6 +192,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private ConnectionEntry connectionEntry; private boolean useKeepAlive; private long maxInactivityDuration; + private Actor openWireActor; private final Set knownDestinations = new ConcurrentHashSet<>(); @@ -270,10 +272,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { super.bufferReceived(connectionID, buffer); + try { - - recoverOperationContext(); - Command command = (Command) inWireFormat.unmarshal(buffer); // log the openwire command @@ -281,6 +281,23 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se traceBufferReceived(connectionID, command); } + if (openWireActor != null) { + openWireActor.act(command); + } else { + act(command); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.debug(e); + sendException(e); + } + + } + + + private void act(Command command) { + try { + recoverOperationContext(); + boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); @@ -734,6 +751,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se createInternalSession(info); + // the actor can only be used after the WireFormat has been initialized with versioning + this.openWireActor = new Actor<>(executor, this::act); + return context; } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java index a4af52d08a..954e85b3cf 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java @@ -25,15 +25,45 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class SoakPagingTest extends SmokeTestBase { + + String protocol; + String consumerType; + boolean transaction; + final String destination; + + public SoakPagingTest(String protocol, String consumerType, boolean transaction) { + this.protocol = protocol; + this.consumerType = consumerType; + this.transaction = transaction; + + if (consumerType.equals("queue")) { + destination = "exampleQueue"; + } else { + destination = "exampleTopic"; + } + } + + @Parameterized.Parameters(name = "protocol={0}, type={1}, tx={2}") + public static Collection getParams() { + return Arrays.asList(new Object[][]{{"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false}, + {"AMQP", "shared", true}, {"AMQP", "queue", true}, {"OPENWIRE", "topic", true}, {"OPENWIRE", "queue", true}, {"CORE", "shared", true}, {"CORE", "queue", true}}); + } + public static final String SERVER_NAME_0 = "replicated-static0"; public static final String SERVER_NAME_1 = "replicated-static1"; @@ -48,27 +78,55 @@ public class SoakPagingTest extends SmokeTestBase { cleanupData(SERVER_NAME_1); server0 = startServer(SERVER_NAME_0, 0, 30000); - server1 = startServer(SERVER_NAME_1, 0, 30000); } - final String destination = "exampleTopic"; static final int consumer_threads = 20; static final int producer_threads = 20; static AtomicInteger j = new AtomicInteger(0); + private static ConnectionFactory createConnectionFactory(String protocol, String uri) { + if (protocol.toUpperCase().equals("OPENWIRE")) { + return new org.apache.activemq.ActiveMQConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("AMQP")) { + + if (uri.startsWith("tcp://")) { + // replacing tcp:// by amqp:// + uri = "amqp" + uri.substring(3); + } + return new JmsConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) { + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri); + } else { + throw new IllegalStateException("Unkown:" + protocol); + } + } public static void main(String[] arg) { try { + + if (arg.length != 4) { + System.err.println("You need to pass in protocol, consumerType, Time, transaction"); + System.exit(0); + } + + String protocol = arg[0]; + String consumerType = arg[1]; + int time = Integer.parseInt(arg[2]); + boolean tx = Boolean.parseBoolean(arg[3]); + if (time == 0) { + time = 15000; + } + final String host = "localhost"; final int port = 61616; - final ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory("failover:(amqp://" + host + ":" + port + ")"); + final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host + ":" + port); for (int i = 0; i < producer_threads; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { - SoakPagingTest app = new SoakPagingTest(); + SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx); app.produce(factory); } }); @@ -81,36 +139,43 @@ public class SoakPagingTest extends SmokeTestBase { Thread t = new Thread(new Runnable() { @Override public void run() { - SoakPagingTest app = new SoakPagingTest(); + SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx); app.consume(factory, j.getAndIncrement()); } }); t.start(); } - Thread.sleep(15000); + Thread.sleep(time); - System.exit(consumed.get()); + System.exit(consumed.get() > 0 ? 1 : 0); } catch (Throwable e) { e.printStackTrace(); - System.exit(-1); + System.exit(0); } } @Test public void testPagingReplication() throws Throwable { - for (int i = 0; i < 3; i++) { - Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName()); - Assert.assertTrue(process.waitFor() > 0); + + Process queueProcess = null; + if (consumerType.equals("queue")) { + queueProcess = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "45000", "" + transaction); } - server1.destroy(); + for (int i = 0; i < 3; i++) { + Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "15000", "" + transaction); - server1 = startServer(SERVER_NAME_1, 0, 30000); + if (i == 0) { + server1 = startServer(SERVER_NAME_1, 0, 30000); + } - for (int i = 0; i < 2; i++) { - Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName()); - Assert.assertTrue(process.waitFor() > 0); + int result = process.waitFor(); + Assert.assertTrue(result > 0); + } + + if (queueProcess != null) { + Assert.assertTrue(queueProcess.waitFor() > 0); } } @@ -124,9 +189,22 @@ public class SoakPagingTest extends SmokeTestBase { Connection connection = factory.createConnection("admin", "admin"); connection.start(); - final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + final Session session; + + if (transaction) { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + + Destination address; + + if (consumerType.equals("queue")) { + address = session.createQueue(destination); + } else { + address = session.createTopic(destination); + } - Destination address = session.createTopic(destination); MessageProducer messageProducer = session.createProducer(address); int i = 0; @@ -142,8 +220,12 @@ public class SoakPagingTest extends SmokeTestBase { messageProducer.send(message); produced.incrementAndGet(); i++; - if (i % 100 == 0) + if (i % 100 == 0) { System.out.println("Published " + i + " messages"); + if (transaction) { + session.commit(); + } + } } } catch (Exception e) { e.printStackTrace(); @@ -154,11 +236,30 @@ public class SoakPagingTest extends SmokeTestBase { try { Connection connection = factory.createConnection("admin", "admin"); - final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + final Session session; + + if (transaction) { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + + Destination address; + + if (consumerType.equals("queue")) { + address = session.createQueue(destination); + } else { + address = session.createTopic(destination); + } - Topic address = session.createTopic(destination); String consumerId = "ss" + (j % 5); - MessageConsumer messageConsumer = session.createSharedConsumer(address, consumerId); + MessageConsumer messageConsumer; + + if (protocol.equals("shared")) { + messageConsumer = session.createSharedConsumer((Topic)address, consumerId); + } else { + messageConsumer = session.createConsumer(address); + } Thread.sleep(5000); connection.start(); @@ -170,8 +271,12 @@ public class SoakPagingTest extends SmokeTestBase { if (m == null) System.out.println("receive() returned null"); i++; - if (i % 100 == 0) + if (i % 100 == 0) { System.out.println("Consumed " + i + " messages"); + if (transaction) { + session.commit(); + } + } } } catch (Exception e) { e.printStackTrace();