From c22cd6563b6956a164ff087d7a59ac33676b0071 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 5 Feb 2009 13:07:32 +0000 Subject: [PATCH] update simple enqueue rate verifier for kaha store git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741096 13f79535-47bb-0310-9956-ffa450edef68 --- .../bugs/VerifySteadyEnqueueRate.java | 157 ++++++++---------- 1 file changed, 71 insertions(+), 86 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java index 5914362016..90f920efc8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java @@ -19,54 +19,40 @@ package org.apache.activemq.bugs; import java.io.File; import java.text.DateFormat; import java.util.Date; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import javax.jms.Session; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - public class VerifySteadyEnqueueRate extends TestCase { - private static final Log LOG = LogFactory.getLog(VerifySteadyEnqueueRate.class); + private static final Log LOG = LogFactory + .getLog(VerifySteadyEnqueueRate.class); - private final CountDownLatch latch = new CountDownLatch(max_messages); - private static int max_messages = 10000000; - private static int messageCounter; - private String destinationName = getName()+"_Queue"; + private static int max_messages = 1000000; + private String destinationName = getName() + "_Queue"; private BrokerService broker; - private Connection receiverConnection; - private Connection producerConnection; final boolean useTopic = false; - - private boolean useAMQPStore=true; + + private boolean useAMQPStore = false; protected static final String payload = new String(new byte[24]); public void setUp() throws Exception { - messageCounter = 0; startBroker(); - receiverConnection = createConnection(); - receiverConnection.start(); - producerConnection = createConnection(); - producerConnection.start(); } - + public void tearDown() throws Exception { - receiverConnection.close(); - producerConnection.close(); broker.stop(); } @@ -74,44 +60,62 @@ public class VerifySteadyEnqueueRate extends TestCase { if (true) { return; } - doTestForDataFileNotDeleted(false); + doTestEnqueue(false); } - - private void doTestForDataFileNotDeleted(boolean transacted) throws Exception { + + private void doTestEnqueue(final boolean transacted) throws Exception { final long min = 100; - long max = 0; + final AtomicLong max = new AtomicLong(0); long reportTime = 0; - Receiver receiver = new Receiver() { - public void receive(String s) throws Exception { - messageCounter++; - latch.countDown(); + + Runnable runner = new Runnable() { + + public void run() { + try { + MessageSender producer = new MessageSender(destinationName, + createConnection(), transacted, useTopic); + + for (int i = 0; i < max_messages; i++) { + long startT = System.currentTimeMillis(); + producer.send(payload); + long endT = System.currentTimeMillis(); + long duration = endT - startT; + + if (duration > max.get()) { + max.set(duration); + } + + if (duration > min) { + System.err.println(Thread.currentThread().getName() + + " " + + DateFormat.getTimeInstance().format( + new Date(startT)) + " at message " + + i + " send time=" + duration); + } + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.out.println("max = " + max); } }; - //buildReceiver(receiverConnection, destinationName, transacted, receiver, useTopic); - - final MessageSender producer = new MessageSender(destinationName, producerConnection, transacted, useTopic); - for (int i=0; i< max_messages; i++) { - long startT = System.currentTimeMillis(); - producer.send(payload ); - long endT = System.currentTimeMillis(); - long duration = endT - startT; - - if (duration > max) { - max = duration; - } - - if (duration > min) { - System.err.println(DateFormat.getTimeInstance().format(new Date(startT)) + " at message " + i + " send time=" + duration); - } + ExecutorService executor = Executors.newCachedThreadPool(); + int numThreads = 6; + for (int i = 0; i < numThreads; i++) { + executor.execute(runner); + } + + executor.shutdown(); + while(!executor.isTerminated()) { + executor.awaitTermination(10, TimeUnit.SECONDS); } - System.out.println("max = " + max); - //latch.await(); - //assertEquals(max_messages, messageCounter); - //waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 30000, 2); } private Connection createConnection() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getConnectUri()); return factory.createConnection(); } @@ -120,21 +124,23 @@ public class VerifySteadyEnqueueRate extends TestCase { broker.setDeleteAllMessagesOnStartup(true); broker.setPersistent(true); broker.setUseJmx(true); - - if( useAMQPStore ) { - AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory(); - // ensure there are a bunch of data files but multiple entries in each - //factory.setMaxFileLength(1024 * 20); + + if (useAMQPStore) { + AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker + .getPersistenceFactory(); + // ensure there are a bunch of data files but multiple entries in + // each + // factory.setMaxFileLength(1024 * 20); // speed up the test case, checkpoint an cleanup early and often - //factory.setCheckpointInterval(500); - factory.setCleanupInterval(1000*60*30); + // factory.setCheckpointInterval(500); + factory.setCleanupInterval(1000 * 60 * 30); factory.setSyncOnWrite(false); - - //int indexBinSize=262144; // good for 6M - int indexBinSize=1024; + + // int indexBinSize=262144; // good for 6M + int indexBinSize = 1024; factory.setIndexMaxBinSize(indexBinSize * 2); factory.setIndexBinSize(indexBinSize); - factory.setIndexPageSize(192*20); + factory.setIndexPageSize(192 * 20); } else { KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(new File("target/activemq-data/kahadb")); @@ -146,25 +152,4 @@ public class VerifySteadyEnqueueRate extends TestCase { broker.start(); LOG.info("Starting broker.."); } - - private void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception { - final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName)); - MessageListener messageListener = new MessageListener() { - - public void onMessage(Message message) { - try { - ObjectMessage objectMessage = (ObjectMessage)message; - String s = (String)objectMessage.getObject(); - receiver.receive(s); - if (session.getTransacted()) { - session.commit(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - }; - inputMessageConsumer.setMessageListener(messageListener); - } }