ACTIVEMQ6-78 Adding tests to evaluate this task
https://issues.apache.org/jira/browse/ACTIVEMQ6-78 This commit is just adding tests I used to debug the blocked calls issue There are some profiling parameters you can use that I added as a comment to the pom The reason this is a separate commit is that it would be easier to validate the results of optimizations while checking after and before any changes
This commit is contained in:
parent
1d5a7a10d3
commit
41b28f4b23
5
pom.xml
5
pom.xml
|
@ -79,6 +79,11 @@
|
|||
see https://intellij-support.jetbrains.com/entries/23395793
|
||||
|
||||
Also see: http://youtrack.jetbrains.com/issue/IDEA-125696
|
||||
|
||||
|
||||
For profiling add this line and use jmc (Java Mission Control) to evaluate the results:
|
||||
-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=delay=30s,duration=120s,filename=/tmp/myrecording.jfr
|
||||
|
||||
-->
|
||||
|
||||
<activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
|
||||
|
|
|
@ -0,0 +1,247 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tests.performance.sends;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.tests.util.JMSTestBase;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Client-ack time
|
||||
*
|
||||
* @author Clebert Suconic
|
||||
*/
|
||||
public abstract class AbstractSendReceivePerfTest extends JMSTestBase
|
||||
{
|
||||
protected static final String Q_NAME = "test-queue-01";
|
||||
private Queue queue;
|
||||
|
||||
protected AtomicBoolean running = new AtomicBoolean(true);
|
||||
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
super.setUp();
|
||||
|
||||
jmsServer.createQueue(false, Q_NAME, null, true, Q_NAME);
|
||||
queue = ActiveMQJMSClient.createQueue(Q_NAME);
|
||||
|
||||
AddressSettings settings = new AddressSettings();
|
||||
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||
settings.setMaxSizeBytes(Long.MAX_VALUE);
|
||||
server.getAddressSettingsRepository().clear();
|
||||
server.getAddressSettingsRepository().addMatch("#", settings);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void registerConnectionFactory() throws Exception
|
||||
{
|
||||
List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>();
|
||||
connectorConfigs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
|
||||
|
||||
createCF(connectorConfigs, "/cf");
|
||||
|
||||
cf = (ConnectionFactory) namingContext.lookup("/cf");
|
||||
}
|
||||
|
||||
|
||||
private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(AbstractSendReceivePerfTest.class.getName());
|
||||
|
||||
|
||||
@Test
|
||||
public void testSendReceive() throws Exception
|
||||
{
|
||||
long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000);
|
||||
|
||||
|
||||
MessageReceiver receiver = new MessageReceiver(Q_NAME, numberOfSamples);
|
||||
receiver.start();
|
||||
MessageSender sender = new MessageSender(Q_NAME);
|
||||
sender.start();
|
||||
|
||||
receiver.join();
|
||||
sender.join();
|
||||
|
||||
assertFalse(receiver.failed);
|
||||
assertFalse(sender.failed);
|
||||
|
||||
}
|
||||
|
||||
final Semaphore pendingCredit = new Semaphore(5000);
|
||||
|
||||
/**
|
||||
* to be called after a message is consumed
|
||||
* so the flow control of the test kicks in.
|
||||
*/
|
||||
protected final void afterConsume(Message message)
|
||||
{
|
||||
if (message != null)
|
||||
{
|
||||
pendingCredit.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected final void beforeSend()
|
||||
{
|
||||
while (running.get())
|
||||
{
|
||||
try
|
||||
{
|
||||
if (pendingCredit.tryAcquire(1, TimeUnit.SECONDS))
|
||||
{
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
System.out.println("Couldn't get credits!");
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private class MessageReceiver extends Thread
|
||||
{
|
||||
private final String qName;
|
||||
private final long numberOfSamples;
|
||||
|
||||
public boolean failed = false;
|
||||
|
||||
public MessageReceiver(String qname, long numberOfSamples) throws Exception
|
||||
{
|
||||
super("Receiver " + qname);
|
||||
this.qName = qname;
|
||||
this.numberOfSamples = numberOfSamples;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
LOGGER.info("Receiver: Connecting");
|
||||
Connection c = cf.createConnection();
|
||||
|
||||
consumeMessages(c, qName);
|
||||
|
||||
c.close();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
failed = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
running.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void consumeMessages(Connection c, String qName) throws Exception;
|
||||
|
||||
private class MessageSender extends Thread
|
||||
{
|
||||
protected String qName;
|
||||
|
||||
public boolean failed = false;
|
||||
|
||||
public MessageSender(String qname) throws Exception
|
||||
{
|
||||
super("Sender " + qname);
|
||||
|
||||
this.qName = qname;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
LOGGER.info("Sender: Connecting");
|
||||
Connection c = cf.createConnection();
|
||||
|
||||
sendMessages(c, qName);
|
||||
|
||||
c.close();
|
||||
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
failed = true;
|
||||
if (e instanceof InterruptedException)
|
||||
{
|
||||
LOGGER.info("Sender done.");
|
||||
}
|
||||
else
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* This will by default send non persistent messages */
|
||||
protected void sendMessages(Connection c, String qName) throws JMSException
|
||||
{
|
||||
Session s = null;
|
||||
s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
LOGGER.info("Sender: Using AUTO-ACK session");
|
||||
|
||||
|
||||
Queue q = s.createQueue(qName);
|
||||
MessageProducer producer = s.createProducer(null);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
|
||||
|
||||
long sent = 0;
|
||||
while (running.get())
|
||||
{
|
||||
beforeSend();
|
||||
producer.send(q, s.createTextMessage("Message_" + (sent++)));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tests.performance.sends;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
/**
|
||||
* @author clebertsuconic
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class ClientACKPerf extends AbstractSendReceivePerfTest
|
||||
{
|
||||
|
||||
@Parameterized.Parameters(name = "batchSize={0}")
|
||||
public static Collection<Object[]> data()
|
||||
{
|
||||
List<Object[]> list = Arrays.asList(new Object[][]{
|
||||
{1},
|
||||
{2000}});
|
||||
|
||||
System.out.println("Size = " + list.size());
|
||||
return list;
|
||||
}
|
||||
|
||||
public ClientACKPerf(int batchSize)
|
||||
{
|
||||
super();
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
|
||||
public final int batchSize;
|
||||
|
||||
@Override
|
||||
protected void consumeMessages(Connection c, String qName) throws Exception
|
||||
{
|
||||
int mode = 0;
|
||||
mode = Session.CLIENT_ACKNOWLEDGE;
|
||||
|
||||
System.out.println("Receiver: Using PRE-ACK mode");
|
||||
|
||||
Session s = c.createSession(false, mode);
|
||||
Queue q = s.createQueue(qName);
|
||||
MessageConsumer consumer = s.createConsumer(q, null, false);
|
||||
|
||||
c.start();
|
||||
|
||||
Message m = null;
|
||||
|
||||
long totalTimeACKTime = 0;
|
||||
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
long nmessages = 0;
|
||||
long timeout = System.currentTimeMillis() + 60 * 1000;
|
||||
while (timeout > System.currentTimeMillis())
|
||||
{
|
||||
m = consumer.receive(5000);
|
||||
afterConsume(m);
|
||||
|
||||
|
||||
if (m == null)
|
||||
{
|
||||
throw new Exception("Failed with m = null");
|
||||
}
|
||||
|
||||
if (nmessages++ % batchSize == 0)
|
||||
{
|
||||
long startACK = System.nanoTime();
|
||||
m.acknowledge();
|
||||
long endACK = System.nanoTime();
|
||||
totalTimeACKTime += (endACK - startACK);
|
||||
}
|
||||
|
||||
|
||||
if (nmessages % 10000 == 0)
|
||||
{
|
||||
printMsgsSec(start, nmessages, totalTimeACKTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
printMsgsSec(start, nmessages, totalTimeACKTime);
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void printMsgsSec(final long start, final double nmessages, final double totalTimeACKTime)
|
||||
{
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
double elapsed = ((double) end - (double) start) / 1000f;
|
||||
|
||||
double messagesPerSecond = nmessages / elapsed;
|
||||
double nAcks = nmessages / batchSize;
|
||||
|
||||
System.out.println("batchSize=" + batchSize + ", numberOfMessages="
|
||||
+ nmessages + ", elapsedTime=" + elapsed + " msgs/sec= " + messagesPerSecond + ",totalTimeAcking=" + String.format("%10.4f", totalTimeACKTime) +
|
||||
", avgACKTime=" + String.format("%10.4f", (totalTimeACKTime / nAcks)));
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tests.performance.sends;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
|
||||
/**
|
||||
* @author clebertsuconic
|
||||
*/
|
||||
|
||||
public class MeasureCommitPerfTest extends AbstractSendReceivePerfTest
|
||||
{
|
||||
@Override
|
||||
protected void consumeMessages(Connection c, String qName) throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
/* This will by default send non persistent messages */
|
||||
protected void sendMessages(Connection c, String qName) throws JMSException
|
||||
{
|
||||
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
|
||||
long timeout = System.currentTimeMillis() + 30 * 1000;
|
||||
|
||||
long startMeasure = System.currentTimeMillis() + 5000;
|
||||
long start = 0;
|
||||
long committs = 0;
|
||||
while (timeout > System.currentTimeMillis())
|
||||
{
|
||||
|
||||
if (start == 0 && System.currentTimeMillis() > startMeasure)
|
||||
{
|
||||
System.out.println("heat up");
|
||||
start = System.currentTimeMillis();
|
||||
committs = 0;
|
||||
}
|
||||
|
||||
s.commit();
|
||||
committs++;
|
||||
if (start > 0 && committs % 1000 == 0) printCommitsSecond(start, committs);
|
||||
}
|
||||
printCommitsSecond(start, committs);
|
||||
|
||||
s.close();
|
||||
}
|
||||
|
||||
|
||||
protected void printCommitsSecond(final long start, final double committs)
|
||||
{
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
double elapsed = ((double) end - (double) start) / 1000f;
|
||||
|
||||
double commitsPerSecond = committs / elapsed;
|
||||
|
||||
System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
|
||||
+ committs + ", elapsed=" + elapsed + " msgs/sec= " + commitsPerSecond);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.tests.performance.sends;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.api.jms.ActiveMQJMSConstants;
|
||||
|
||||
/**
|
||||
* @author clebertsuconic
|
||||
*/
|
||||
|
||||
public class PreACKPerf extends AbstractSendReceivePerfTest
|
||||
{
|
||||
@Override
|
||||
protected void consumeMessages(Connection c, String qName) throws Exception
|
||||
{
|
||||
int mode = 0;
|
||||
mode = ActiveMQJMSConstants.PRE_ACKNOWLEDGE;
|
||||
|
||||
System.out.println("Receiver: Using PRE-ACK mode");
|
||||
|
||||
Session s = c.createSession(false, mode);
|
||||
Queue q = s.createQueue(qName);
|
||||
MessageConsumer consumer = s.createConsumer(q, null, false);
|
||||
|
||||
c.start();
|
||||
|
||||
Message m = null;
|
||||
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
long nmessages = 0;
|
||||
long timeout = System.currentTimeMillis() + 30 * 1000;
|
||||
while (timeout > System.currentTimeMillis())
|
||||
{
|
||||
m = consumer.receive(5000);
|
||||
|
||||
nmessages++;
|
||||
|
||||
if (m == null)
|
||||
{
|
||||
throw new Exception("Failed with m = null");
|
||||
}
|
||||
|
||||
if (nmessages % 10000 == 0)
|
||||
{
|
||||
printMsgsSec(start, nmessages);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
|
||||
printMsgsSec(start, nmessages);
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void printMsgsSec(final long start, final double nmessages)
|
||||
{
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
double elapsed = ((double) end - (double) start) / 1000f;
|
||||
|
||||
double messagesPerSecond = nmessages / elapsed;
|
||||
|
||||
System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
|
||||
+ nmessages + ", elapsed=" + elapsed + " msgs/sec= " + messagesPerSecond);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue