From 6e7e3abf5da3f59b5a365f71a3c92516db4844c6 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 7 Feb 2007 11:12:53 +0000 Subject: [PATCH] fix some problems in Quick Journal - now message containers are using Kaha maps instead of lists git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@504501 13f79535-47bb-0310-9956-ffa450edef68 --- .../cursors/FilePendingMessageCursor.java | 2 +- .../CommandMarshaller.java | 2 +- .../kahadaptor/KahaPersistenceAdapter.java | 7 +- .../store/kahadaptor/KahaReferenceStore.java | 2 +- .../kahadaptor/KahaReferenceStoreAdapter.java | 1 + .../kahadaptor/KahaTopicReferenceStore.java | 2 +- .../store/kahadaptor/MessageIdMarshaller.java | 43 ---------- .../store/rapid/RapidPersistenceAdapter.java | 4 +- .../apache/activemq/perf/SimpleTopicTest.java | 79 +++++++++---------- .../apache/activemq/perf/SlowConsumer.java | 3 +- .../activemq/perf/SlowConsumerTopicTest.java | 67 +++++++++------- 11 files changed, 86 insertions(+), 126 deletions(-) rename activemq-core/src/main/java/org/apache/activemq/{store/kahadaptor => kaha}/CommandMarshaller.java (97%) delete mode 100644 activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MessageIdMarshaller.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 377a9ff45d..32dea93a80 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -22,12 +22,12 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; +import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.Store; import org.apache.activemq.memory.UsageListener; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.store.kahadaptor.CommandMarshaller; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java similarity index 97% rename from activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java rename to activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java index 0b7cda3bc8..1317cf4e52 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/CommandMarshaller.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.store.kahadaptor; +package org.apache.activemq.kaha; import java.io.DataInput; import java.io.DataOutput; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java index d32d166315..e4db2c08e5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java @@ -20,21 +20,20 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.Command; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.MessageIdMarshaller; import org.apache.activemq.kaha.MessageMarshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreFactory; -import org.apache.activemq.kaha.StringMarshaller; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; @@ -78,7 +77,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ Set rc=new HashSet(); try{ Store store=getStore(); - for(Iterator i=store.getListContainerIds().iterator();i.hasNext();){ + for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ Object obj=i.next(); if(obj instanceof ActiveMQDestination){ rc.add((ActiveMQDestination) obj); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index 3357895404..dd1a21a4ee 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -110,7 +110,7 @@ public class KahaReferenceStore implements ReferenceStore{ public void addReferenceFileIdsInUse(Set rc){ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ - ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry); + ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry); rc.add(msg.data.getFileId()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index 33dc17e642..bbba5dec23 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -32,6 +32,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.MessageIdMarshaller; import org.apache.activemq.kaha.MessageMarshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.store.MessageStore; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index fcc3f03a1a..f88548c690 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -100,7 +100,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry); if(subAck.getCount()>0){ - ReferenceRecord rr=(ReferenceRecord)messageContainer.get(subAck.getMessageEntry()); + ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry()); rc.add(rr.data.getFileId()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MessageIdMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MessageIdMarshaller.java deleted file mode 100644 index 0fc31a8a15..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/MessageIdMarshaller.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadaptor; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.kaha.Marshaller; - - -/** - * Marshall an AtomicInteger - * @version $Revision: 1.10 $ - */ -public class MessageIdMarshaller implements Marshaller{ - - - public void writePayload(MessageId mid,DataOutput dataOut) throws IOException{ - dataOut.writeUTF(mid.toString()); - - } - - public MessageId readPayload(DataInput dataIn) throws IOException{ - String str = dataIn.readUTF(); - return new MessageId(str); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java index ff04f18d83..f9c7611e5e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java @@ -48,6 +48,7 @@ import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Store; @@ -60,9 +61,6 @@ import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller; -import org.apache.activemq.store.kahadaptor.CommandMarshaller; -import org.apache.activemq.store.kahadaptor.KahaTopicMessageStore; import org.apache.activemq.store.kahadaptor.TopicSubAckMarshaller; import org.apache.activemq.store.rapid.RapidTransactionStore.Tx; import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation; diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java index cf1aa11f2e..2d3e1ba420 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java @@ -1,20 +1,17 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.perf; import javax.jms.Connection; @@ -22,37 +19,38 @@ import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; - import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + /** * @version $Revision: 1.3 $ */ public class SimpleTopicTest extends TestCase{ + private final Log log=LogFactory.getLog(getClass()); - protected BrokerService broker; -// protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false"; + // protected String + // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false"; protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=true"; - //protected String bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=false"; - //protected String bindAddress="vm://localhost?marshal=true"; - //protected String bindAddress="vm://localhost"; + // protected String + // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=false"; + // protected String bindAddress="vm://localhost?marshal=true"; + // protected String bindAddress="vm://localhost"; protected PerfProducer[] producers; protected PerfConsumer[] consumers; protected String DESTINATION_NAME=getClass().getName(); - protected int SAMPLE_COUNT = 30; - protected long SAMPLE_INTERVAL = 2000; - protected int NUMBER_OF_CONSUMERS=10; + protected int SAMPLE_COUNT=30; + protected long SAMPLE_INTERVAL=2000; + protected int NUMBER_OF_CONSUMERS=1; protected int NUMBER_OF_PRODUCERS=1; protected int PAYLOAD_SIZE=1024; protected byte[] array=null; protected ConnectionFactory factory; protected Destination destination; - protected long CONSUMER_SLEEP_DURATION = 0; + protected long CONSUMER_SLEEP_DURATION=0; /** * Sets up a test where the producer and consumer have their own connection. @@ -66,11 +64,9 @@ public class SimpleTopicTest extends TestCase{ factory=createConnectionFactory(); Connection con=factory.createConnection(); Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE); - destination=createDestination(session,DESTINATION_NAME); log.info("Testing against destination: "+destination); log.info("Running "+NUMBER_OF_PRODUCERS+" producer(s) and "+NUMBER_OF_CONSUMERS+" consumer(s)"); - con.close(); producers=new PerfProducer[NUMBER_OF_PRODUCERS]; consumers=new PerfConsumer[NUMBER_OF_CONSUMERS]; @@ -81,7 +77,7 @@ public class SimpleTopicTest extends TestCase{ for(int i=0;i