From 41b28f4b23cb42bb168a7b1dfb0fb6e5d6faa053 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 10 Feb 2015 10:53:02 -0500 Subject: [PATCH] ACTIVEMQ6-78 Adding tests to evaluate this task https://issues.apache.org/jira/browse/ACTIVEMQ6-78 This commit is just adding tests I used to debug the blocked calls issue There are some profiling parameters you can use that I added as a comment to the pom The reason this is a separate commit is that it would be easier to validate the results of optimizations while checking after and before any changes --- pom.xml | 7 +- .../sends/AbstractSendReceivePerfTest.java | 247 ++++++++++++++++++ .../performance/sends/ClientACKPerf.java | 130 +++++++++ .../sends/MeasureCommitPerfTest.java | 81 ++++++ .../tests/performance/sends/PreACKPerf.java | 93 +++++++ 5 files changed, 557 insertions(+), 1 deletion(-) create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java diff --git a/pom.xml b/pom.xml index c3ae18af4d..417214582b 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,12 @@ see https://intellij-support.jetbrains.com/entries/23395793 Also see: http://youtrack.jetbrains.com/issue/IDEA-125696 - --> + + + For profiling add this line and use jmc (Java Mission Control) to evaluate the results: + -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=delay=30s,duration=120s,filename=/tmp/myrecording.jfr + + --> -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java new file mode 100644 index 0000000000..c8b2d7fb66 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.jms.ActiveMQJMSClient; +import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.core.settings.impl.AddressSettings; +import org.apache.activemq.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +/** + * Client-ack time + * + * @author Clebert Suconic + */ +public abstract class AbstractSendReceivePerfTest extends JMSTestBase +{ + protected static final String Q_NAME = "test-queue-01"; + private Queue queue; + + protected AtomicBoolean running = new AtomicBoolean(true); + + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + + jmsServer.createQueue(false, Q_NAME, null, true, Q_NAME); + queue = ActiveMQJMSClient.createQueue(Q_NAME); + + AddressSettings settings = new AddressSettings(); + settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + settings.setMaxSizeBytes(Long.MAX_VALUE); + server.getAddressSettingsRepository().clear(); + server.getAddressSettingsRepository().addMatch("#", settings); + + } + + + @Override + protected void registerConnectionFactory() throws Exception + { + List connectorConfigs = new ArrayList(); + connectorConfigs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); + + createCF(connectorConfigs, "/cf"); + + cf = (ConnectionFactory) namingContext.lookup("/cf"); + } + + + private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(AbstractSendReceivePerfTest.class.getName()); + + + @Test + public void testSendReceive() throws Exception + { + long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000); + + + MessageReceiver receiver = new MessageReceiver(Q_NAME, numberOfSamples); + receiver.start(); + MessageSender sender = new MessageSender(Q_NAME); + sender.start(); + + receiver.join(); + sender.join(); + + assertFalse(receiver.failed); + assertFalse(sender.failed); + + } + + final Semaphore pendingCredit = new Semaphore(5000); + + /** + * to be called after a message is consumed + * so the flow control of the test kicks in. + */ + protected final void afterConsume(Message message) + { + if (message != null) + { + pendingCredit.release(); + } + } + + + protected final void beforeSend() + { + while (running.get()) + { + try + { + if (pendingCredit.tryAcquire(1, TimeUnit.SECONDS)) + { + return; + } + else + { + System.out.println("Couldn't get credits!"); + } + } + catch (Throwable e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + + + + private class MessageReceiver extends Thread + { + private final String qName; + private final long numberOfSamples; + + public boolean failed = false; + + public MessageReceiver(String qname, long numberOfSamples) throws Exception + { + super("Receiver " + qname); + this.qName = qname; + this.numberOfSamples = numberOfSamples; + } + + @Override + public void run() + { + try + { + LOGGER.info("Receiver: Connecting"); + Connection c = cf.createConnection(); + + consumeMessages(c, qName); + + c.close(); + } + catch (Exception e) + { + e.printStackTrace(); + failed = true; + } + finally + { + running.set(false); + } + } + } + + protected abstract void consumeMessages(Connection c, String qName) throws Exception; + + private class MessageSender extends Thread + { + protected String qName; + + public boolean failed = false; + + public MessageSender(String qname) throws Exception + { + super("Sender " + qname); + + this.qName = qname; + } + + @Override + public void run() + { + try + { + LOGGER.info("Sender: Connecting"); + Connection c = cf.createConnection(); + + sendMessages(c, qName); + + c.close(); + + } + catch (Exception e) + { + failed = true; + if (e instanceof InterruptedException) + { + LOGGER.info("Sender done."); + } + else + { + e.printStackTrace(); + } + } + } + } + + /* This will by default send non persistent messages */ + protected void sendMessages(Connection c, String qName) throws JMSException + { + Session s = null; + s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + LOGGER.info("Sender: Using AUTO-ACK session"); + + + Queue q = s.createQueue(qName); + MessageProducer producer = s.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + + long sent = 0; + while (running.get()) + { + beforeSend(); + producer.send(q, s.createTextMessage("Message_" + (sent++))); + } + } +} \ No newline at end of file diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java new file mode 100644 index 0000000000..6dba4f41e5 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * @author clebertsuconic + */ +@RunWith(Parameterized.class) +public class ClientACKPerf extends AbstractSendReceivePerfTest +{ + + @Parameterized.Parameters(name = "batchSize={0}") + public static Collection data() + { + List list = Arrays.asList(new Object[][]{ + {1}, + {2000}}); + + System.out.println("Size = " + list.size()); + return list; + } + + public ClientACKPerf(int batchSize) + { + super(); + this.batchSize = batchSize; + } + + + public final int batchSize; + + @Override + protected void consumeMessages(Connection c, String qName) throws Exception + { + int mode = 0; + mode = Session.CLIENT_ACKNOWLEDGE; + + System.out.println("Receiver: Using PRE-ACK mode"); + + Session s = c.createSession(false, mode); + Queue q = s.createQueue(qName); + MessageConsumer consumer = s.createConsumer(q, null, false); + + c.start(); + + Message m = null; + + long totalTimeACKTime = 0; + + + long start = System.currentTimeMillis(); + + long nmessages = 0; + long timeout = System.currentTimeMillis() + 60 * 1000; + while (timeout > System.currentTimeMillis()) + { + m = consumer.receive(5000); + afterConsume(m); + + + if (m == null) + { + throw new Exception("Failed with m = null"); + } + + if (nmessages++ % batchSize == 0) + { + long startACK = System.nanoTime(); + m.acknowledge(); + long endACK = System.nanoTime(); + totalTimeACKTime += (endACK - startACK); + } + + + if (nmessages % 10000 == 0) + { + printMsgsSec(start, nmessages, totalTimeACKTime); + } + } + + + printMsgsSec(start, nmessages, totalTimeACKTime); + } + + + + protected void printMsgsSec(final long start, final double nmessages, final double totalTimeACKTime) + { + + long end = System.currentTimeMillis(); + double elapsed = ((double) end - (double) start) / 1000f; + + double messagesPerSecond = nmessages / elapsed; + double nAcks = nmessages / batchSize; + + System.out.println("batchSize=" + batchSize + ", numberOfMessages=" + + nmessages + ", elapsedTime=" + elapsed + " msgs/sec= " + messagesPerSecond + ",totalTimeAcking=" + String.format("%10.4f", totalTimeACKTime) + + ", avgACKTime=" + String.format("%10.4f", (totalTimeACKTime / nAcks))); + + } + + +} diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java new file mode 100644 index 0000000000..0528d0eae6 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; + + +/** + * @author clebertsuconic + */ + +public class MeasureCommitPerfTest extends AbstractSendReceivePerfTest +{ + @Override + protected void consumeMessages(Connection c, String qName) throws Exception + { + } + + + /* This will by default send non persistent messages */ + protected void sendMessages(Connection c, String qName) throws JMSException + { + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + + + long timeout = System.currentTimeMillis() + 30 * 1000; + + long startMeasure = System.currentTimeMillis() + 5000; + long start = 0; + long committs = 0; + while (timeout > System.currentTimeMillis()) + { + + if (start == 0 && System.currentTimeMillis() > startMeasure) + { + System.out.println("heat up"); + start = System.currentTimeMillis(); + committs = 0; + } + + s.commit(); + committs++; + if (start > 0 && committs % 1000 == 0) printCommitsSecond(start, committs); + } + printCommitsSecond(start, committs); + + s.close(); + } + + + protected void printCommitsSecond(final long start, final double committs) + { + + long end = System.currentTimeMillis(); + double elapsed = ((double) end - (double) start) / 1000f; + + double commitsPerSecond = committs / elapsed; + + System.out.println("end = " + end + ", start=" + start + ", numberOfMessages=" + + committs + ", elapsed=" + elapsed + " msgs/sec= " + commitsPerSecond); + + } + + +} diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java new file mode 100644 index 0000000000..a6d2906017 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.api.jms.ActiveMQJMSConstants; + +/** + * @author clebertsuconic + */ + +public class PreACKPerf extends AbstractSendReceivePerfTest +{ + @Override + protected void consumeMessages(Connection c, String qName) throws Exception + { + int mode = 0; + mode = ActiveMQJMSConstants.PRE_ACKNOWLEDGE; + + System.out.println("Receiver: Using PRE-ACK mode"); + + Session s = c.createSession(false, mode); + Queue q = s.createQueue(qName); + MessageConsumer consumer = s.createConsumer(q, null, false); + + c.start(); + + Message m = null; + + + long start = System.currentTimeMillis(); + + long nmessages = 0; + long timeout = System.currentTimeMillis() + 30 * 1000; + while (timeout > System.currentTimeMillis()) + { + m = consumer.receive(5000); + + nmessages++; + + if (m == null) + { + throw new Exception("Failed with m = null"); + } + + if (nmessages % 10000 == 0) + { + printMsgsSec(start, nmessages); + } + + } + + long end = System.currentTimeMillis(); + + printMsgsSec(start, nmessages); + } + + + + protected void printMsgsSec(final long start, final double nmessages) + { + + long end = System.currentTimeMillis(); + double elapsed = ((double) end - (double) start) / 1000f; + + double messagesPerSecond = nmessages / elapsed; + + System.out.println("end = " + end + ", start=" + start + ", numberOfMessages=" + + nmessages + ", elapsed=" + elapsed + " msgs/sec= " + messagesPerSecond); + + } + + +}