diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java new file mode 100644 index 0000000000..cfdef60e51 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.perf; + +import java.io.File; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; + +/** + * @version $Revision: 1.3 $ + */ +public class KahaDBDurableTopicTest extends SimpleDurableTopicTest { + + @Override + protected void setUp() throws Exception { + this.initialConsumerDelay = 10 * 1000; + super.setUp(); + } + + @Override + protected void configureBroker(BrokerService answer, String uri) throws Exception { + + File dataFileDir = new File("target/test-amq-data/perfTest/kahadb"); + File archiveDir = new File(dataFileDir, "archive"); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(dataFileDir); + kaha.setDirectoryArchive(archiveDir); + //kaha.setArchiveDataLogs(true); + + // The setEnableJournalDiskSyncs(false) setting is a little dangerous + // right now, as I have not verified + // what happens if the index is updated but a journal update is lost. + // Index is going to be in consistent, but can it be repaired? + kaha.setEnableJournalDiskSyncs(false); + // Using a bigger journal file size makes he take fewer spikes as it is + // not switching files as often. + // kaha.setJournalMaxFileLength(1024*1024*100); + + // small batch means more frequent and smaller writes + kaha.setIndexWriteBatchSize(100); + kaha.setIndexCacheSize(10000); + // do the index write in a separate thread + //kaha.setEnableIndexWriteAsync(true); + + answer.setPersistenceAdapter(kaha); + answer.addConnector(uri); + answer.setDeleteAllMessagesOnStartup(true); + + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java index 2406d73e18..1ca48158da 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java @@ -25,6 +25,11 @@ import org.apache.activemq.store.kahadb.KahaDBStore; */ public class KahaDBQueueTest extends SimpleQueueTest { + @Override + protected void setUp() throws Exception { + this.initialConsumerDelay = 10 * 1000; + super.setUp(); + } @Override protected void configureBroker(BrokerService answer,String uri) throws Exception { @@ -38,7 +43,7 @@ public class KahaDBQueueTest extends SimpleQueueTest { // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified // what happens if the index is updated but a journal update is lost. // Index is going to be in consistent, but can it be repaired? - kaha.setEnableJournalDiskSyncs(false); + //kaha.setEnableJournalDiskSyncs(false); // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. //kaha.setJournalMaxFileLength(1024*1024*100); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java index 39f4f6a77d..494bb51ed5 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java @@ -28,16 +28,18 @@ import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; * @version $Revision: 1.3 $ */ public class SimpleDurableTopicTest extends SimpleTopicTest { - + protected long initialConsumerDelay = 0; + @Override protected void setUp() throws Exception { - numberOfDestinations=10; - numberOfConsumers = 10; - numberofProducers = 2; + numberOfDestinations=1; + numberOfConsumers = 2; + numberofProducers = 1; sampleCount=1000; playloadSize = 1024; super.setUp(); } + @Override protected void configureBroker(BrokerService answer,String uri) throws Exception { AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory(); persistenceFactory.setMaxFileLength(1024*16); @@ -49,18 +51,21 @@ public class SimpleDurableTopicTest extends SimpleTopicTest { answer.setUseShutdownHook(false); } + @Override protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte payload[]) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); pp.setDeliveryMode(DeliveryMode.PERSISTENT); return pp; } + @Override protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number); - result.setInitialDelay(0); + result.setInitialDelay(this.initialConsumerDelay); return result; } + @Override protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception { ActiveMQConnectionFactory result = super.createConnectionFactory(uri); //result.setSendAcksAsync(false); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java index 1efa17b157..e2a5e4e565 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java @@ -26,6 +26,8 @@ import javax.jms.Session; */ public class SimpleQueueTest extends SimpleTopicTest { + protected long initialConsumerDelay = 0; + protected long consumerSleep = 0; @Override protected Destination createDestination(Session s, String destinationName) throws JMSException { return s.createQueue(destinationName); @@ -40,8 +42,8 @@ public class SimpleQueueTest extends SimpleTopicTest { @Override protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { PerfConsumer consumer = new PerfConsumer(fac, dest); - //consumer.setInitialDelay(10000); - //consumer.setSleepDuration(10); + consumer.setInitialDelay(this.initialConsumerDelay); + consumer.setSleepDuration(this.consumerSleep); boolean enableAudit = numberOfConsumers <= 1; System.err.println("Enable Audit = " + enableAudit); consumer.setEnableAudit(enableAudit);