diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java new file mode 100644 index 0000000000..bfb08c5802 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.kahadb.journal.DataFile; +import org.apache.kahadb.journal.Journal; + + +public class DefaultJournalManager implements JournalManager { + + private final Journal journal; + private final List journals; + + public DefaultJournalManager() { + this.journal = new Journal(); + List list = new ArrayList(1); + list.add(this.journal); + this.journals = Collections.unmodifiableList(list); + } + + public void start() throws IOException { + journal.start(); + } + + public void close() throws IOException { + journal.close(); + } + + public Journal getJournal(ActiveMQDestination destination) { + return journal; + } + + public void setDirectory(File directory) { + journal.setDirectory(directory); + } + + public void setMaxFileLength(int maxFileLength) { + journal.setMaxFileLength(maxFileLength); + } + + public void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles) { + journal.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); + } + + public void setChecksum(boolean checksum) { + journal.setChecksum(checksum); + } + + public void setWriteBatchSize(int batchSize) { + journal.setWriteBatchSize(batchSize); + } + + public void setArchiveDataLogs(boolean archiveDataLogs) { + journal.setArchiveDataLogs(archiveDataLogs); + } + + public void setStoreSize(AtomicLong storeSize) { + journal.setSizeAccumulator(storeSize); + } + + public void setDirectoryArchive(File directoryArchive) { + journal.setDirectoryArchive(directoryArchive); + } + + public void delete() throws IOException { + journal.delete(); + } + + public Map getFileMap() { + return journal.getFileMap(); + } + + public Collection getJournals() { + return journals; + } + + public Collection getJournals(Set set) { + return journals; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java new file mode 100644 index 0000000000..bf59bdcc6d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.IOHelper; +import org.apache.kahadb.journal.DataFile; +import org.apache.kahadb.journal.Journal; + +public class DestinationJournalManager implements JournalManager { + private static final String PREPEND = "JournalDest-"; + private static final String QUEUE_PREPEND = PREPEND + "Queue-"; + private static final String TOPIC_PREPEND = PREPEND + "Topic-"; + private AtomicBoolean started = new AtomicBoolean(); + private final Map journalMap = new ConcurrentHashMap(); + private File directory = new File("KahaDB"); + private File directoryArchive; + private int maxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + private boolean checkForCorruptionOnStartup; + private boolean checksum = false; + private int writeBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; + private boolean archiveDataLogs; + private AtomicLong storeSize = new AtomicLong(0); + + + public AtomicBoolean getStarted() { + return started; + } + + public void setStarted(AtomicBoolean started) { + this.started = started; + } + + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public File getDirectoryArchive() { + return directoryArchive; + } + + public void setDirectoryArchive(File directoryArchive) { + this.directoryArchive = directoryArchive; + } + + public int getMaxFileLength() { + return maxFileLength; + } + + public void setMaxFileLength(int maxFileLength) { + this.maxFileLength = maxFileLength; + } + + public boolean isCheckForCorruptionOnStartup() { + return checkForCorruptionOnStartup; + } + + public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { + this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; + } + + public boolean isChecksum() { + return checksum; + } + + public void setChecksum(boolean checksum) { + this.checksum = checksum; + } + + public int getWriteBatchSize() { + return writeBatchSize; + } + + public void setWriteBatchSize(int writeBatchSize) { + this.writeBatchSize = writeBatchSize; + } + + public boolean isArchiveDataLogs() { + return archiveDataLogs; + } + + public void setArchiveDataLogs(boolean archiveDataLogs) { + this.archiveDataLogs = archiveDataLogs; + } + + public AtomicLong getStoreSize() { + return storeSize; + } + + public void setStoreSize(AtomicLong storeSize) { + this.storeSize = storeSize; + } + + + public void start() throws IOException { + if (started.compareAndSet(false, true)) { + File[] files = getDirectory().listFiles(new FilenameFilter() { + public boolean accept(File file, String s) { + if (file.isDirectory() && s != null && s.startsWith(PREPEND)) { + return true; + } + return false; + } + }); + if (files != null) { + for (File file : files) { + ActiveMQDestination destination; + if (file.getName().startsWith(TOPIC_PREPEND)) { + String destinationName = file.getName().substring(TOPIC_PREPEND.length()); + destination = new ActiveMQTopic(destinationName); + } else { + String destinationName = file.getName().substring(QUEUE_PREPEND.length()); + destination = new ActiveMQQueue(destinationName); + } + + Journal journal = new Journal(); + journal.setDirectory(file); + if (getDirectoryArchive() != null) { + IOHelper.mkdirs(getDirectoryArchive()); + File archive = new File(getDirectoryArchive(), file.getName()); + IOHelper.mkdirs(archive); + journal.setDirectoryArchive(archive); + } + configure(journal); + journalMap.put(destination, journal); + } + } + for (Journal journal : journalMap.values()) { + journal.start(); + } + } + + } + + public void close() throws IOException { + started.set(false); + for (Journal journal : journalMap.values()) { + journal.close(); + } + journalMap.clear(); + } + + + public void delete() throws IOException { + for (Journal journal : journalMap.values()) { + journal.delete(); + } + journalMap.clear(); + } + + public Journal getJournal(ActiveMQDestination destination) throws IOException { + Journal journal = journalMap.get(destination); + if (journal == null && !destination.isTemporary()) { + journal = new Journal(); + String fileName; + if (destination.isTopic()) { + fileName = TOPIC_PREPEND + destination.getPhysicalName(); + } else { + fileName = QUEUE_PREPEND + destination.getPhysicalName(); + } + File file = new File(getDirectory(), fileName); + IOHelper.mkdirs(file); + journal.setDirectory(file); + if (getDirectoryArchive() != null) { + IOHelper.mkdirs(getDirectoryArchive()); + File archive = new File(getDirectoryArchive(), fileName); + IOHelper.mkdirs(archive); + journal.setDirectoryArchive(archive); + } + configure(journal); + if (started.get()) { + journal.start(); + } + return journal; + } else { + return journal; + } + } + + public Map getFileMap() { + throw new RuntimeException("Not supported"); + } + + public Collection getJournals() { + return journalMap.values(); + } + + public Collection getJournals(Set set) { + List list = new ArrayList(); + for (ActiveMQDestination destination : set) { + Journal j = journalMap.get(destination); + if (j != null) { + list.add(j); + } + } + return list; + } + + protected void configure(Journal journal) { + journal.setMaxFileLength(getMaxFileLength()); + journal.setCheckForCorruptionOnStartup(isCheckForCorruptionOnStartup()); + journal.setChecksum(isChecksum() || isCheckForCorruptionOnStartup()); + journal.setWriteBatchSize(getWriteBatchSize()); + journal.setArchiveDataLogs(isArchiveDataLogs()); + journal.setSizeAccumulator(getStoreSize()); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java new file mode 100644 index 0000000000..a71fd24767 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.kahadb.journal.DataFile; +import org.apache.kahadb.journal.Journal; + +public interface JournalManager { + + void start() throws IOException; + + void close() throws IOException; + + Journal getJournal(ActiveMQDestination destination) throws IOException; + + void setDirectory(File directory); + + void setMaxFileLength(int maxFileLength); + + void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles); + + void setChecksum(boolean checksum); + + void setWriteBatchSize(int batchSize); + + void setArchiveDataLogs(boolean archiveDataLogs); + + void setStoreSize(AtomicLong storeSize); + + void setDirectoryArchive(File directoryArchive); + + void delete() throws IOException; + + Map getFileMap(); + + Collection getJournals(); + + Collection getJournals(Set set); +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 6db6509568..f4f1c9fcc6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb; import java.io.File; import java.io.IOException; import java.util.Set; + import org.apache.activeio.journal.Journal; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -37,16 +38,11 @@ import org.apache.activemq.usage.SystemUsage; * An implementation of {@link PersistenceAdapter} designed for use with a * {@link Journal} and then check pointing asynchronously on a timeout with some * other long term persistent storage. - * - * @org.apache.xbean.XBean element="kahaDB" - * */ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { private final KahaDBStore letter = new KahaDBStore(); /** - * @param context - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) */ public void beginTransaction(ConnectionContext context) throws IOException { @@ -54,8 +50,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param sync - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) */ public void checkpoint(boolean sync) throws IOException { @@ -63,8 +57,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param context - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) */ public void commitTransaction(ConnectionContext context) throws IOException { @@ -72,9 +64,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param destination * @return MessageStore - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) */ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { @@ -82,9 +72,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param destination * @return TopicMessageStore - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) */ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { @@ -93,7 +81,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * @return TrandactionStore - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() */ public TransactionStore createTransactionStore() throws IOException { @@ -101,7 +88,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() */ public void deleteAllMessages() throws IOException { @@ -118,7 +104,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * @return lastMessageBrokerSequenceId - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() */ public long getLastMessageBrokerSequenceId() throws IOException { @@ -130,7 +115,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param destination * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) */ public void removeQueueMessageStore(ActiveMQQueue destination) { @@ -138,7 +122,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param destination * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) */ public void removeTopicMessageStore(ActiveMQTopic destination) { @@ -146,8 +129,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param context - * @throws IOException * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) */ public void rollbackTransaction(ConnectionContext context) throws IOException { @@ -155,7 +136,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param brokerName * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) */ public void setBrokerName(String brokerName) { @@ -163,7 +143,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param usageManager * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) */ public void setUsageManager(SystemUsage usageManager) { @@ -179,7 +158,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @throws Exception * @see org.apache.activemq.Service#start() */ public void start() throws Exception { @@ -187,7 +165,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @throws Exception * @see org.apache.activemq.Service#stop() */ public void stop() throws Exception { @@ -196,7 +173,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the journalMaxFileLength - * + * * @return the journalMaxFileLength */ public int getJournalMaxFileLength() { @@ -206,8 +183,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can * be used - * - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" */ public void setJournalMaxFileLength(int journalMaxFileLength) { this.letter.setJournalMaxFileLength(journalMaxFileLength); @@ -219,7 +194,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); } - + public int getMaxFailoverProducersToTrack() { return this.letter.getMaxFailoverProducersToTrack(); } @@ -231,14 +206,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); } - + public int getFailoverProducersAuditDepth() { return this.getFailoverProducersAuditDepth(); } - + /** * Get the checkpointInterval - * + * * @return the checkpointInterval */ public long getCheckpointInterval() { @@ -247,9 +222,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the checkpointInterval - * - * @param checkpointInterval - * the checkpointInterval to set + * + * @param checkpointInterval the checkpointInterval to set */ public void setCheckpointInterval(long checkpointInterval) { this.letter.setCheckpointInterval(checkpointInterval); @@ -257,7 +231,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the cleanupInterval - * + * * @return the cleanupInterval */ public long getCleanupInterval() { @@ -266,9 +240,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the cleanupInterval - * - * @param cleanupInterval - * the cleanupInterval to set + * + * @param cleanupInterval the cleanupInterval to set */ public void setCleanupInterval(long cleanupInterval) { this.letter.setCleanupInterval(cleanupInterval); @@ -276,7 +249,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the indexWriteBatchSize - * + * * @return the indexWriteBatchSize */ public int getIndexWriteBatchSize() { @@ -286,9 +259,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the indexWriteBatchSize * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" - * @param indexWriteBatchSize - * the indexWriteBatchSize to set + * + * @param indexWriteBatchSize the indexWriteBatchSize to set */ public void setIndexWriteBatchSize(int indexWriteBatchSize) { this.letter.setIndexWriteBatchSize(indexWriteBatchSize); @@ -296,7 +268,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the journalMaxWriteBatchSize - * + * * @return the journalMaxWriteBatchSize */ public int getJournalMaxWriteBatchSize() { @@ -305,10 +277,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the journalMaxWriteBatchSize - * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" - * @param journalMaxWriteBatchSize - * the journalMaxWriteBatchSize to set + * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used + * + * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set */ public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); @@ -316,7 +287,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the enableIndexWriteAsync - * + * * @return the enableIndexWriteAsync */ public boolean isEnableIndexWriteAsync() { @@ -325,9 +296,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the enableIndexWriteAsync - * - * @param enableIndexWriteAsync - * the enableIndexWriteAsync to set + * + * @param enableIndexWriteAsync the enableIndexWriteAsync to set */ public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); @@ -335,7 +305,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the directory - * + * * @return the directory */ public File getDirectory() { @@ -343,7 +313,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * @param dir * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) */ public void setDirectory(File dir) { @@ -352,7 +321,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the enableJournalDiskSyncs - * + * * @return the enableJournalDiskSyncs */ public boolean isEnableJournalDiskSyncs() { @@ -361,9 +330,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the enableJournalDiskSyncs - * - * @param enableJournalDiskSyncs - * the enableJournalDiskSyncs to set + * + * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set */ public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); @@ -371,7 +339,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the indexCacheSize - * + * * @return the indexCacheSize */ public int getIndexCacheSize() { @@ -381,9 +349,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the indexCacheSize * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" - * @param indexCacheSize - * the indexCacheSize to set + * + * @param indexCacheSize the indexCacheSize to set */ public void setIndexCacheSize(int indexCacheSize) { this.letter.setIndexCacheSize(indexCacheSize); @@ -391,7 +358,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the ignoreMissingJournalfiles - * + * * @return the ignoreMissingJournalfiles */ public boolean isIgnoreMissingJournalfiles() { @@ -400,9 +367,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the ignoreMissingJournalfiles - * - * @param ignoreMissingJournalfiles - * the ignoreMissingJournalfiles to set + * + * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set */ public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); @@ -463,14 +429,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi public int getMaxAsyncJobs() { return letter.getMaxAsyncJobs(); } + /** - * @param maxAsyncJobs - * the maxAsyncJobs to set + * @param maxAsyncJobs the maxAsyncJobs to set */ public void setMaxAsyncJobs(int maxAsyncJobs) { letter.setMaxAsyncJobs(maxAsyncJobs); } - + /** * @return the databaseLockedWaitDelay */ @@ -482,7 +448,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set */ public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { - letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay); + letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay); } public boolean getForceRecoverIndex() { @@ -493,6 +459,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi letter.setForceRecoverIndex(forceRecoverIndex); } + public boolean isJournalPerDestination() { + return letter.isJournalPerDestination(); + } + + public void setJournalPerDestination(boolean journalPerDestination) { + letter.setJournalPerDestination(journalPerDestination); + } + // for testing public KahaDBStore getStore() { return letter; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 8f8bd9057b..7aeac28537 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -26,30 +26,24 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTempQueue; -import org.apache.activemq.command.ActiveMQTempTopic; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.LocalTransactionId; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.filter.BooleanExpression; -import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.command.*; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; @@ -58,23 +52,20 @@ import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; -import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; +import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; -import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; -import org.apache.activemq.store.kahadb.data.KahaXATransactionId; -import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.kahadb.journal.Journal; import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); @@ -85,7 +76,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( - PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; + PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10); protected ExecutorService queueExecutor; protected ExecutorService topicExecutor; @@ -128,8 +119,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } /** - * @param concurrentStoreAndDispatch - * the concurrentStoreAndDispatch to set + * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set */ public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; @@ -143,8 +133,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } /** - * @param concurrentStoreAndDispatch - * the concurrentStoreAndDispatch to set + * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set */ public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; @@ -153,16 +142,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public boolean isConcurrentStoreAndDispatchTransactions() { return this.concurrentStoreAndDispatchTransactions; } - + /** * @return the maxAsyncJobs */ public int getMaxAsyncJobs() { return this.maxAsyncJobs; } + /** - * @param maxAsyncJobs - * the maxAsyncJobs to set + * @param maxAsyncJobs the maxAsyncJobs to set */ public void setMaxAsyncJobs(int maxAsyncJobs) { this.maxAsyncJobs = maxAsyncJobs; @@ -177,20 +166,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { this.asyncTopicJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue, new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); - thread.setDaemon(true); - return thread; - } - }); + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); + thread.setDaemon(true); + return thread; + } + }); this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue, new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); - thread.setDaemon(true); - return thread; - } - }); + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); + thread.setDaemon(true); + return thread; + } + }); } @Override @@ -287,14 +276,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { protected KahaDestination dest; private final int maxAsyncJobs; private final Semaphore localDestinationSemaphore; + private final Journal journal; double doneTasks, canceledTasks = 0; - public KahaDBMessageStore(ActiveMQDestination destination) { + public KahaDBMessageStore(ActiveMQDestination destination) throws IOException { super(destination); this.dest = convert(destination); this.maxAsyncJobs = getMaxAsyncJobs(); this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); + this.journal = getJournalManager().getJournal(destination); } @Override @@ -356,8 +347,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { command.setPrioritySupported(isPrioritizedMessages()); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); - + store(journal, command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); + } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { @@ -368,13 +359,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); + store(journal, command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); } public void removeAllMessages(ConnectionContext context) throws IOException { KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); command.setDestination(dest); - store(command, true, null, null); + store(journal, command, true, null, null); } public Message getMessage(MessageId identity) throws IOException { @@ -396,14 +387,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return sd.orderIndex.get(tx, sequence).location; } }); - }finally { + } finally { indexLock.readLock().unlock(); } if (location == null) { return null; } - return loadMessage(location); + return loadMessage(journal, location); } public int getMessageCount() throws IOException { @@ -419,14 +410,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { StoredDestination sd = getStoredDestination(dest, tx); int rc = 0; for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator - .hasNext();) { + .hasNext(); ) { iterator.next(); rc++; } return rc; } }); - }finally { + } finally { indexLock.readLock().unlock(); } } finally { @@ -446,7 +437,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return sd.locationIndex.isEmpty(tx); } }); - }finally { + } finally { indexLock.readLock().unlock(); } } @@ -460,22 +451,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { StoredDestination sd = getStoredDestination(dest, tx); sd.orderIndex.resetCursorPosition(); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator - .hasNext();) { + .hasNext(); ) { Entry entry = iterator.next(); if (ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } - Message msg = loadMessage(entry.getValue().location); + Message msg = loadMessage(journal, entry.getValue().location); listener.recoverMessage(msg); } } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } - + public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { indexLock.readLock().lock(); try { @@ -490,7 +481,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } - Message msg = loadMessage(entry.getValue().location); + Message msg = loadMessage(journal, entry.getValue().location); listener.recoverMessage(msg); counter++; if (counter >= maxReturned) { @@ -500,7 +491,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.orderIndex.stoppedIterating(); } }); - }finally { + } finally { indexLock.readLock().unlock(); } } @@ -511,11 +502,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void execute(Transaction tx) throws Exception { StoredDestination sd = getExistingStoredDestination(dest, tx); if (sd != null) { - sd.orderIndex.resetCursorPosition();} + sd.orderIndex.resetCursorPosition(); } - }); + } + }); } catch (Exception e) { - LOG.error("Failed to reset batching",e); + LOG.error("Failed to reset batching", e); } } @@ -528,10 +520,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // Hopefully one day the page file supports concurrent read // operations... but for now we must // externally synchronize... - + indexLock.writeLock().lock(); try { - pageFile.tx().execute(new Transaction.Closure() { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long location = sd.messageIdIndex.get(tx, key); @@ -540,10 +532,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - }finally { + } finally { indexLock.writeLock().unlock(); } - + } finally { unlockAsyncJobQueue(); } @@ -553,15 +545,21 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public void setMemoryUsage(MemoryUsage memoeyUSage) { } + @Override public void start() throws Exception { super.start(); } + @Override public void stop() throws Exception { super.stop(); } + public Journal getJournal() { + return this.journal; + } + protected void lockAsyncJobQueue() { try { this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); @@ -590,6 +588,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { private final AtomicInteger subscriptionCount = new AtomicInteger(); + public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { super(destination); this.subscriptionCount.set(getAllSubscriptions().length); @@ -646,7 +645,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (ack != null && ack.isUnmatchedAck()) { command.setAck(UNMATCHED); } - store(command, false, null, null); + store(getJournal(), command, false, null, null); } public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { @@ -658,7 +657,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { command.setRetroactive(retroactive); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && true, null, null); + store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null); this.subscriptionCount.incrementAndGet(); } @@ -666,7 +665,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); - store(command, isEnableJournalDiskSyncs() && true, null, null); + store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null); this.subscriptionCount.decrementAndGet(); } @@ -679,7 +678,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator - .hasNext();) { + .hasNext(); ) { Entry entry = iterator.next(); SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry .getValue().getSubscriptionInfo().newInput())); @@ -688,7 +687,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - }finally { + } finally { indexLock.readLock().unlock(); } @@ -712,7 +711,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { .getSubscriptionInfo().newInput())); } }); - }finally { + } finally { indexLock.readLock().unlock(); } } @@ -732,7 +731,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { int counter = 0; for (Iterator>> iterator = - sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) { + sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext(); ) { Entry> entry = iterator.next(); if (entry.getValue().contains(subscriptionKey)) { counter++; @@ -741,7 +740,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return counter; } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -758,20 +757,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); sd.orderIndex.setBatch(tx, cursorPos); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator - .hasNext();) { + .hasNext(); ) { Entry entry = iterator.next(); - listener.recoverMessage(loadMessage(entry.getValue().location)); + listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location)); } sd.orderIndex.resetCursorPosition(); } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, - final MessageRecoveryListener listener) throws Exception { + final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); indexLock.writeLock().lock(); @@ -796,9 +795,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { Entry entry = null; int counter = 0; for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator - .hasNext();) { + .hasNext(); ) { entry = iterator.next(); - if (listener.recoverMessage(loadMessage(entry.getValue().location))) { + if (listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location))) { counter++; } if (counter >= maxReturned || listener.hasSpace() == false) { @@ -812,7 +811,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } @@ -828,7 +827,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.subscriptionCursors.remove(subscriptionKey); } }); - }finally { + } finally { indexLock.writeLock().unlock(); } } catch (IOException e) { @@ -852,9 +851,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * Cleanup method to remove any state associated with the given destination. * This method does not stop the message store (it might not be cached). - * - * @param destination - * Destination to forget + * + * @param destination Destination to forget */ public void removeQueueMessageStore(ActiveMQQueue destination) { } @@ -862,9 +860,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * Cleanup method to remove any state associated with the given destination * This method does not stop the message store (it might not be cached). - * - * @param destination - * Destination to forget + * + * @param destination Destination to forget */ public void removeTopicMessageStore(ActiveMQTopic destination) { } @@ -881,7 +878,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { for (Iterator> iterator = metadata.destinations.iterator(tx); iterator - .hasNext();) { + .hasNext(); ) { Entry entry = iterator.next(); if (!isEmptyTopic(entry, tx)) { rc.add(convert(entry.getKey())); @@ -902,7 +899,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return isEmptyTopic; } }); - }finally { + } finally { indexLock.readLock().unlock(); } return rc; @@ -914,7 +911,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public long getLastMessageBrokerSequenceId() throws IOException { return 0; } - + public long getLastProducerSequenceId(ProducerId id) { indexLock.readLock().lock(); try { @@ -931,9 +928,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void beginTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } + public void commitTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } + public void rollbackTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } @@ -951,8 +950,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { * @return * @throws IOException */ - Message loadMessage(Location location) throws IOException { - KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); + Message loadMessage(Journal journal, Location location) throws IOException { + KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(journal, location); Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); return msg; } @@ -972,20 +971,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { KahaDestination rc = new KahaDestination(); rc.setName(dest.getPhysicalName()); switch (dest.getDestinationType()) { - case ActiveMQDestination.QUEUE_TYPE: - rc.setType(DestinationType.QUEUE); - return rc; - case ActiveMQDestination.TOPIC_TYPE: - rc.setType(DestinationType.TOPIC); - return rc; - case ActiveMQDestination.TEMP_QUEUE_TYPE: - rc.setType(DestinationType.TEMP_QUEUE); - return rc; - case ActiveMQDestination.TEMP_TOPIC_TYPE: - rc.setType(DestinationType.TEMP_TOPIC); - return rc; - default: - return null; + case ActiveMQDestination.QUEUE_TYPE: + rc.setType(DestinationType.QUEUE); + return rc; + case ActiveMQDestination.TOPIC_TYPE: + rc.setType(DestinationType.TOPIC); + return rc; + case ActiveMQDestination.TEMP_QUEUE_TYPE: + rc.setType(DestinationType.TEMP_QUEUE); + return rc; + case ActiveMQDestination.TEMP_TOPIC_TYPE: + rc.setType(DestinationType.TEMP_TOPIC); + return rc; + default: + return null; } } @@ -998,16 +997,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { String name = dest.substring(p + 1); switch (KahaDestination.DestinationType.valueOf(type)) { - case QUEUE: - return new ActiveMQQueue(name); - case TOPIC: - return new ActiveMQTopic(name); - case TEMP_QUEUE: - return new ActiveMQTempQueue(name); - case TEMP_TOPIC: - return new ActiveMQTempTopic(name); - default: - throw new IllegalArgumentException("Not in the valid destination format"); + case QUEUE: + return new ActiveMQQueue(name); + case TOPIC: + return new ActiveMQTopic(name); + case TEMP_QUEUE: + return new ActiveMQTempQueue(name); + case TEMP_TOPIC: + return new ActiveMQTempTopic(name); + default: + throw new IllegalArgumentException("Not in the valid destination format"); } } @@ -1137,8 +1136,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { private final int subscriptionCount; private final List subscriptionKeys = new ArrayList(1); private final KahaDBTopicMessageStore topicStore; + public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, - int subscriptionCount) { + int subscriptionCount) { super(store, context, message); this.topicStore = store; this.subscriptionCount = subscriptionCount; @@ -1170,8 +1170,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * add a key - * - * @param key + * * @return true if all acknowledgements received */ public boolean addSubscriptionKey(String key) { @@ -1217,7 +1216,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { super.afterExecute(runnable, throwable); if (runnable instanceof StoreTask) { - ((StoreTask)runnable).releaseLocks(); + ((StoreTask) runnable).releaseLocks(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java index fc10db277b..f48753550f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java @@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb; import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -26,7 +27,9 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; @@ -49,14 +52,13 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.wireformat.WireFormat; +import org.apache.kahadb.journal.Journal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Provides a TransactionStore implementation that can create transaction aware * MessageStore objects from non transaction aware MessageStore objects. - * - * */ public class KahaDBTransactionStore implements TransactionStore { static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); @@ -70,21 +72,23 @@ public class KahaDBTransactionStore implements TransactionStore { public class Tx { private final ArrayList messages = new ArrayList(); - private final ArrayList acks = new ArrayList(); + private final HashSet destinations = new HashSet(); public void add(AddMessageCommand msg) { messages.add(msg); + destinations.add(msg.getMessage().getDestination()); } public void add(RemoveMessageCommand ack) { acks.add(ack); + destinations.add(ack.getMessageAck().getDestination()); } public Message[] getMessages() { Message rc[] = new Message[messages.size()]; int count = 0; - for (Iterator iter = messages.iterator(); iter.hasNext();) { + for (Iterator iter = messages.iterator(); iter.hasNext(); ) { AddMessageCommand cmd = iter.next(); rc[count++] = cmd.getMessage(); } @@ -94,7 +98,7 @@ public class KahaDBTransactionStore implements TransactionStore { public MessageAck[] getAcks() { MessageAck rc[] = new MessageAck[acks.size()]; int count = 0; - for (Iterator iter = acks.iterator(); iter.hasNext();) { + for (Iterator iter = acks.iterator(); iter.hasNext(); ) { RemoveMessageCommand cmd = iter.next(); rc[count++] = cmd.getMessageAck(); } @@ -103,49 +107,56 @@ public class KahaDBTransactionStore implements TransactionStore { /** * @return true if something to commit - * @throws IOException */ public List> commit() throws IOException { List> results = new ArrayList>(); // Do all the message adds. - for (Iterator iter = messages.iterator(); iter.hasNext();) { + for (Iterator iter = messages.iterator(); iter.hasNext(); ) { AddMessageCommand cmd = iter.next(); results.add(cmd.run()); } // And removes.. - for (Iterator iter = acks.iterator(); iter.hasNext();) { + for (Iterator iter = acks.iterator(); iter.hasNext(); ) { RemoveMessageCommand cmd = iter.next(); cmd.run(); results.add(cmd.run()); } - + return results; } } public abstract class AddMessageCommand { private final ConnectionContext ctx; + AddMessageCommand(ConnectionContext ctx) { this.ctx = ctx; } + abstract Message getMessage(); + Future run() throws IOException { return run(this.ctx); } + abstract Future run(ConnectionContext ctx) throws IOException; } public abstract class RemoveMessageCommand { private final ConnectionContext ctx; + RemoveMessageCommand(ConnectionContext ctx) { this.ctx = ctx; } + abstract MessageAck getMessageAck(); + Future run() throws IOException { return run(this.ctx); } + abstract Future run(ConnectionContext context) throws IOException; } @@ -197,8 +208,8 @@ public class KahaDBTransactionStore implements TransactionStore { @Override public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - MessageId messageId, MessageAck ack) throws IOException { - KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId, + MessageId messageId, MessageAck ack) throws IOException { + KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore) getDelegate(), clientId, subscriptionName, messageId, ack); } @@ -206,13 +217,17 @@ public class KahaDBTransactionStore implements TransactionStore { } /** - * @throws IOException * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ public void prepare(TransactionId txid) throws IOException { inflightTransactions.remove(txid); KahaTransactionInfo info = getTransactionInfo(txid); - theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); + Tx tx = inflightTransactions.get(txid); + if (tx != null) { + for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) { + theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null); + } + } } public Tx getTx(Object txid) { @@ -242,7 +257,7 @@ public class KahaDBTransactionStore implements TransactionStore { theStore.brokerService.handleIOException(new IOException(e.getMessage())); } catch (ExecutionException e) { theStore.brokerService.handleIOException(new IOException(e.getMessage())); - }catch(CancellationException e) { + } catch (CancellationException e) { } if (!result.isCancelled()) { doneSomething = true; @@ -253,9 +268,11 @@ public class KahaDBTransactionStore implements TransactionStore { } if (doneSomething) { KahaTransactionInfo info = getTransactionInfo(txid); - theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null); + for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) { + theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, null, null); + } } - }else { + } else { //The Tx will be null for failed over clients - lets run their post commits if (postCommit != null) { postCommit.run(); @@ -266,23 +283,26 @@ public class KahaDBTransactionStore implements TransactionStore { KahaTransactionInfo info = getTransactionInfo(txid); // ensure message order w.r.t to cursor and store for setBatch() synchronized (this) { - theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit); + for (Journal journal : theStore.getJournalManager().getJournals()) { + theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit); + } forgetRecoveredAcks(txid); } } - }else { - LOG.error("Null transaction passed on commit"); + } else { + LOG.error("Null transaction passed on commit"); } } /** - * @throws IOException * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ public void rollback(TransactionId txid) throws IOException { if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { KahaTransactionInfo info = getTransactionInfo(txid); - theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null); + for (Journal journal : theStore.getJournalManager().getJournals()) { + theStore.store(journal, new KahaRollbackCommand().setTransactionInfo(info), false, null, null); + } forgetRecoveredAcks(txid); } else { inflightTransactions.remove(txid); @@ -349,6 +369,7 @@ public class KahaDBTransactionStore implements TransactionStore { public Message getMessage() { return message; } + @Override public Future run(ConnectionContext ctx) throws IOException { destination.addMessage(ctx, message); @@ -376,6 +397,7 @@ public class KahaDBTransactionStore implements TransactionStore { public Message getMessage() { return message; } + @Override public Future run(ConnectionContext ctx) throws IOException { return destination.asyncAddQueueMessage(ctx, message); @@ -393,7 +415,7 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (message.getTransactionId() != null) { - if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { + if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { destination.addMessage(context, message); return AbstractMessageStore.FUTURE; } else { @@ -403,6 +425,7 @@ public class KahaDBTransactionStore implements TransactionStore { public Message getMessage() { return message; } + @Override public Future run(ConnectionContext ctx) throws IOException { return destination.asyncAddTopicMessage(ctx, message); @@ -424,7 +447,7 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { + if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { destination.removeMessage(context, ack); } else { Tx tx = getTx(ack.getTransactionId()); @@ -450,7 +473,7 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { + if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { destination.removeAsyncMessage(context, ack); } else { Tx tx = getTx(ack.getTransactionId()); @@ -476,7 +499,7 @@ public class KahaDBTransactionStore implements TransactionStore { final MessageId messageId, final MessageAck ack) throws IOException { if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { + if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { destination.acknowledge(context, clientId, subscriptionName, messageId, ack); } else { Tx tx = getTx(ack.getTransactionId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 238aaaafbc..d9c2f87a9c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -18,15 +18,7 @@ package org.apache.activemq.store.kahadb; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; +import java.io.*; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageAck; @@ -44,27 +37,11 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; -import org.apache.activemq.store.kahadb.data.KahaCommitCommand; -import org.apache.activemq.store.kahadb.data.KahaDestination; -import org.apache.activemq.store.kahadb.data.KahaEntryType; -import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; -import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; -import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; -import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; -import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; -import org.apache.activemq.store.kahadb.data.KahaTraceCommand; -import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; -import org.apache.activemq.store.kahadb.data.KahaXATransactionId; +import org.apache.activemq.store.kahadb.data.*; import org.apache.activemq.util.Callback; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; -import org.apache.kahadb.util.LocationMarshaller; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.kahadb.index.BTreeIndex; import org.apache.kahadb.index.BTreeVisitor; import org.apache.kahadb.journal.DataFile; @@ -73,16 +50,9 @@ import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; -import org.apache.kahadb.util.ByteSequence; -import org.apache.kahadb.util.DataByteArrayInputStream; -import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.LockFile; -import org.apache.kahadb.util.LongMarshaller; -import org.apache.kahadb.util.Marshaller; -import org.apache.kahadb.util.Sequence; -import org.apache.kahadb.util.SequenceSet; -import org.apache.kahadb.util.StringMarshaller; -import org.apache.kahadb.util.VariableMarshaller; +import org.apache.kahadb.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { @@ -92,9 +62,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0")); protected static final Buffer UNMATCHED; + static { UNMATCHED = new Buffer(new byte[]{}); } + private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; @@ -105,7 +77,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar static final int VERSION = 3; - protected class Metadata { protected Page page; protected int state; @@ -115,6 +86,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar protected Location producerSequenceIdTrackerLocation = null; protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); protected int version = VERSION; + public void read(DataInput is) throws IOException { state = is.readInt(); destinations = new BTreeIndex(pageFile, is.readLong()); @@ -137,9 +109,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } catch (EOFException expectedOnUpgrade) { } try { - version = is.readInt(); - }catch (EOFException expectedOnUpgrade) { - version=1; + version = is.readInt(); + } catch (EOFException expectedOnUpgrade) { + version = 1; } LOG.info("KahaDB is version " + version); } @@ -185,7 +157,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } protected PageFile pageFile; - protected Journal journal; + protected JournalManager journalManager; protected Metadata metadata = new Metadata(); protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); @@ -195,12 +167,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar protected boolean deleteAllMessages; protected File directory = new File("KahaDB"); protected Thread checkpointThread; - protected boolean enableJournalDiskSyncs=true; + protected boolean enableJournalDiskSyncs = true; protected boolean archiveDataLogs; protected File directoryArchive; protected AtomicLong storeSize = new AtomicLong(0); - long checkpointInterval = 5*1000; - long cleanupInterval = 30*1000; + long checkpointInterval = 5 * 1000; + long cleanupInterval = 30 * 1000; int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; boolean enableIndexWriteAsync = false; @@ -216,6 +188,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY; protected boolean forceRecoverIndex = false; private final Object checkpointThreadLock = new Object(); + private boolean journalPerDestination = false; + public MessageDatabase() { } @@ -262,15 +236,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar storedDestinations.clear(); pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext(); ) { Entry entry = iterator.next(); - StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); + StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions != null); storedDestinations.put(entry.getKey(), sd); } } }); pageFile.flush(); - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -297,11 +271,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar while (opened.get()) { Thread.sleep(sleepTime); long now = System.currentTimeMillis(); - if( now - lastCleanup >= cleanupInterval ) { + if (now - lastCleanup >= cleanupInterval) { checkpointCleanup(true); lastCleanup = now; lastCheckpoint = now; - } else if( now - lastCheckpoint >= checkpointInterval ) { + } else if (now - lastCheckpoint >= checkpointInterval) { checkpointCleanup(false); lastCheckpoint = now; } @@ -322,8 +296,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } public void open() throws IOException { - if( opened.compareAndSet(false, true) ) { - getJournal().start(); + if (opened.compareAndSet(false, true)) { + getJournalManager().start(); loadPageFile(); startCheckpoint(); recover(); @@ -375,18 +349,20 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar try { lock(); if (deleteAllMessages) { - getJournal().start(); - getJournal().delete(); - getJournal().close(); - journal = null; + getJournalManager().start(); + getJournalManager().delete(); + getJournalManager().close(); + journalManager = null; getPageFile().delete(); LOG.info("Persistence store purged."); deleteAllMessages = false; } open(); - store(new KahaTraceCommand().setMessage("LOADED " + new Date())); - }finally { + for (Journal journal : getJournalManager().getJournals()) { + store(journal, new KahaTraceCommand().setMessage("LOADED " + new Date())); + } + } finally { this.indexLock.writeLock().unlock(); } @@ -394,32 +370,34 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public void close() throws IOException, InterruptedException { - if( opened.compareAndSet(true, false)) { + if (opened.compareAndSet(true, false)) { this.indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, true); + for (Journal journal : getJournalManager().getJournals()) { + checkpointUpdate(tx, journal, true); + } } }); pageFile.unload(); metadata = new Metadata(); - }finally { + } finally { this.indexLock.writeLock().unlock(); } - journal.close(); + journalManager.close(); synchronized (checkpointThreadLock) { checkpointThread.join(); } lockFile.unlock(); - lockFile=null; + lockFile = null; } } public void unload() throws IOException, InterruptedException { this.indexLock.writeLock().lock(); try { - if( pageFile != null && pageFile.isLoaded() ) { + if (pageFile != null && pageFile.isLoaded()) { metadata.state = CLOSED_STATE; metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); @@ -429,7 +407,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } }); } - }finally { + } finally { this.indexLock.writeLock().unlock(); } close(); @@ -444,7 +422,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } if (!preparedTransactions.isEmpty()) { Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); - if (l==null || t.compareTo(l) <= 0) { + if (l == null || t.compareTo(l) <= 0) { l = t; } } @@ -455,63 +433,65 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar /** * Move all the messages that were in the journal into long term storage. We * just replay and do a checkpoint. - * - * @throws IOException - * @throws IOException - * @throws IllegalStateException */ private void recover() throws IllegalStateException, IOException { this.indexLock.writeLock().lock(); try { - - long start = System.currentTimeMillis(); - Location producerAuditPosition = recoverProducerAudit(); - Location lastIndoubtPosition = getRecoveryPosition(); - - Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); - - if (recoveryPosition != null) { - int redoCounter = 0; - LOG.info("Recovering from the journal ..."); - while (recoveryPosition != null) { - JournalCommand message = load(recoveryPosition); - metadata.lastUpdate = recoveryPosition; - process(message, recoveryPosition, lastIndoubtPosition); - redoCounter++; - recoveryPosition = journal.getNextLocation(recoveryPosition); - } - long end = System.currentTimeMillis(); - LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + for (Journal journal : getJournalManager().getJournals()) { + recover(journal); } - - // We may have to undo some index updates. - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - recoverIndex(tx); - } - }); - - // rollback any recovered inflight local transactions - Set toRollback = new HashSet(); - synchronized (inflightTransactions) { - for (Iterator it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { - TransactionId id = it.next(); - if (id.isLocalTransaction()) { - toRollback.add(id); - } - } - for (TransactionId tx: toRollback) { - LOG.debug("rolling back recovered indoubt local transaction " + tx); - store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null); - } - } - }finally { + } finally { this.indexLock.writeLock().unlock(); } } + private void recover(final Journal journal) throws IllegalStateException, IOException { + + long start = System.currentTimeMillis(); + Location producerAuditPosition = recoverProducerAudit(journal); + Location lastIndoubtPosition = getRecoveryPosition(journal); + + Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); + + if (recoveryPosition != null) { + int redoCounter = 0; + LOG.info("Recovering from the journal ..."); + while (recoveryPosition != null) { + JournalCommand message = load(journal, recoveryPosition); + metadata.lastUpdate = recoveryPosition; + process(message, recoveryPosition, lastIndoubtPosition); + redoCounter++; + recoveryPosition = journal.getNextLocation(recoveryPosition); + } + long end = System.currentTimeMillis(); + LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + } + + // We may have to undo some index updates. + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + recoverIndex(tx, journal); + } + }); + + // rollback any recovered inflight local transactions + Set toRollback = new HashSet(); + synchronized (inflightTransactions) { + for (Iterator it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { + TransactionId id = it.next(); + if (id.isLocalTransaction()) { + toRollback.add(id); + } + } + for (TransactionId tx : toRollback) { + LOG.debug("rolling back recovered indoubt local transaction " + tx); + store(journal, new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null); + } + } + } + private Location minimum(Location producerAuditPosition, - Location lastIndoubtPosition) { + Location lastIndoubtPosition) { Location min = null; if (producerAuditPosition != null) { min = producerAuditPosition; @@ -524,9 +504,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return min; } - private Location recoverProducerAudit() throws IOException { + private Location recoverProducerAudit(Journal journal) throws IOException { if (metadata.producerSequenceIdTrackerLocation != null) { - KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); + KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(journal, metadata.producerSequenceIdTrackerLocation); try { ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); @@ -542,12 +522,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } - protected void recoverIndex(Transaction tx) throws IOException { + protected void recoverIndex(Transaction tx, Journal journal) throws IOException { long start = System.currentTimeMillis(); // It is possible index updates got applied before the journal updates.. // in that case we need to removed references to messages that are not in the journal final Location lastAppendLocation = journal.getLastAppendLocation(); - long undoCounter=0; + long undoCounter = 0; // Go through all the destinations to see if they have messages past the lastAppendLocation for (StoredDestination sd : storedDestinations.values()) { @@ -573,7 +553,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } long end = System.currentTimeMillis(); - if( undoCounter > 0 ) { + if (undoCounter > 0) { // The rolledback operations are basically in flight journal writes. To avoid getting these the end user // should do sync writes to the journal. LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); @@ -589,12 +569,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar for (StoredDestination sd : storedDestinations.values()) { // Use a visitor to cut down the number of pages that we load sd.locationIndex.visit(tx, new BTreeVisitor() { - int last=-1; + int last = -1; public boolean isInterestedInKeysBetween(Location first, Location second) { - if( first==null ) { + if (first == null) { return !ss.contains(0, second.getDataFileId()); - } else if( second==null ) { + } else if (second == null) { return true; } else { return !ss.contains(first.getDataFileId(), second.getDataFileId()); @@ -604,7 +584,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public void visit(List keys, List values) { for (Location l : keys) { int fileId = l.getDataFileId(); - if( last != fileId ) { + if (last != fileId) { ss.add(fileId); last = fileId; } @@ -614,34 +594,34 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar }); } HashSet missingJournalFiles = new HashSet(); - while( !ss.isEmpty() ) { - missingJournalFiles.add( (int)ss.removeFirst() ); + while (!ss.isEmpty()) { + missingJournalFiles.add((int) ss.removeFirst()); } - missingJournalFiles.removeAll( journal.getFileMap().keySet() ); + missingJournalFiles.removeAll(journal.getFileMap().keySet()); - if( !missingJournalFiles.isEmpty() ) { - LOG.info("Some journal files are missing: "+missingJournalFiles); + if (!missingJournalFiles.isEmpty()) { + LOG.info("Some journal files are missing: " + missingJournalFiles); } ArrayList> missingPredicates = new ArrayList>(); for (Integer missing : missingJournalFiles) { - missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(missing,0), new Location(missing+1,0))); + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(missing, 0), new Location(missing + 1, 0))); } - if ( checkForCorruptJournalFiles ) { + if (checkForCorruptJournalFiles) { Collection dataFiles = journal.getFileMap().values(); for (DataFile dataFile : dataFiles) { int id = dataFile.getDataFileId(); - missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id,dataFile.getLength()), new Location(id+1,0))); + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); Sequence seq = dataFile.getCorruptedBlocks().getHead(); - while( seq!=null ) { - missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1))); + while (seq != null) { + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1))); seq = seq.getNext(); } } } - if( !missingPredicates.isEmpty() ) { + if (!missingPredicates.isEmpty()) { for (StoredDestination sd : storedDestinations.values()) { final ArrayList matches = new ArrayList(); @@ -653,11 +633,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar }); // If somes message references are affected by the missing data files... - if( !matches.isEmpty() ) { + if (!matches.isEmpty()) { // We either 'gracefully' recover dropping the missing messages or // we error out. - if( ignoreMissingJournalfiles ) { + if (ignoreMissingJournalfiles) { // Update the index to remove the references to the missing data for (Long sequenceId : matches) { MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); @@ -668,14 +648,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } else { - throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); + throw new IOException("Detected missing/corrupt journal files. " + matches.size() + " messages affected."); } } } } end = System.currentTimeMillis(); - if( undoCounter > 0 ) { + if (undoCounter > 0) { // The rolledback operations are basically in flight journal writes. To avoid getting these the end user // should do sync writes to the journal. LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); @@ -685,12 +665,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar private Location nextRecoveryPosition; private Location lastRecoveryPosition; - public void incrementalRecover() throws IOException { + public void incrementalRecover(Journal journal) throws IOException { this.indexLock.writeLock().lock(); try { - if( nextRecoveryPosition == null ) { - if( lastRecoveryPosition==null ) { - nextRecoveryPosition = getRecoveryPosition(); + if (nextRecoveryPosition == null) { + if (lastRecoveryPosition == null) { + nextRecoveryPosition = getRecoveryPosition(journal); } else { nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); } @@ -698,11 +678,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar while (nextRecoveryPosition != null) { lastRecoveryPosition = nextRecoveryPosition; metadata.lastUpdate = lastRecoveryPosition; - JournalCommand message = load(lastRecoveryPosition); + JournalCommand message = load(journal, lastRecoveryPosition); process(message, lastRecoveryPosition); nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); } - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -711,7 +691,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return metadata.lastUpdate; } - private Location getRecoveryPosition() throws IOException { + private Location getRecoveryPosition(Journal journal) throws IOException { if (!this.forceRecoverIndex) { @@ -721,7 +701,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } // Perhaps there were no transactions... - if( metadata.lastUpdate!=null) { + if (metadata.lastUpdate != null) { // Start replay at the record after the last one recorded in the index file. return journal.getNextLocation(metadata.lastUpdate); } @@ -731,38 +711,44 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } protected void checkpointCleanup(final boolean cleanup) throws IOException { + for (Journal journal : getJournalManager().getJournals()) { + checkpointCleanup(journal, cleanup); + } + } + + protected void checkpointCleanup(final Journal journal, final boolean cleanup) throws IOException { long start; this.indexLock.writeLock().lock(); try { start = System.currentTimeMillis(); - if( !opened.get() ) { + if (!opened.get()) { return; } pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, cleanup); + checkpointUpdate(tx, journal, cleanup); } }); - }finally { + } finally { this.indexLock.writeLock().unlock(); } long end = System.currentTimeMillis(); - if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { - LOG.info("Slow KahaDB access: cleanup took "+(end-start)); + if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { + LOG.info("Slow KahaDB access: cleanup took " + (end - start)); } } - public void checkpoint(Callback closure) throws Exception { + public void checkpoint(final Journal journal, Callback closure) throws Exception { this.indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, false); + checkpointUpdate(tx, journal, false); } }); closure.execute(); - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -770,17 +756,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // ///////////////////////////////////////////////////////////////// // Methods call by the broker to update and query the store. // ///////////////////////////////////////////////////////////////// - public Location store(JournalCommand data) throws IOException { - return store(data, false, null,null); + public Location store(Journal journal, JournalCommand data) throws IOException { + return store(journal, data, false, null, null); } + /** * All updated are are funneled through this method. The updates are converted * to a JournalMessage which is logged to the journal and then the data from * the JournalMessage is used to update the index just like it would be done * during a recovery process. */ - public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException { + public Location store(final Journal journal, JournalCommand data, boolean sync, Runnable before, Runnable after) throws IOException { if (before != null) { before.run(); } @@ -795,14 +782,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar long start2 = System.currentTimeMillis(); process(data, location); long end = System.currentTimeMillis(); - if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { - LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); + if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { + LOG.info("Slow KahaDB access: Journal append took: " + (start2 - start) + " ms, Index update took " + (end - start2) + " ms"); } this.indexLock.writeLock().lock(); try { metadata.lastUpdate = location; - }finally { + } finally { this.indexLock.writeLock().unlock(); } if (!checkpointThread.isAlive()) { @@ -821,35 +808,27 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar /** * Loads a previously stored JournalMessage - * - * @param location - * @return - * @throws IOException */ - public JournalCommand load(Location location) throws IOException { + public JournalCommand load(Journal journal, Location location) throws IOException { long start = System.currentTimeMillis(); ByteSequence data = journal.read(location); long end = System.currentTimeMillis(); - if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { - LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); + if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { + LOG.info("Slow KahaDB access: Journal read took: " + (end - start) + " ms"); } DataByteArrayInputStream is = new DataByteArrayInputStream(data); byte readByte = is.readByte(); KahaEntryType type = KahaEntryType.valueOf(readByte); - if( type == null ) { - throw new IOException("Could not load journal record. Invalid location: "+location); + if (type == null) { + throw new IOException("Could not load journal record. Invalid location: " + location); } - JournalCommand message = (JournalCommand)type.createMessage(); + JournalCommand message = (JournalCommand) type.createMessage(); message.mergeFramed(is); return message; } /** * do minimal recovery till we reach the last inDoubtLocation - * @param data - * @param location - * @param inDoubtlocation - * @throws IOException */ void process(JournalCommand data, final Location location, final Location inDoubtlocation) throws IOException { if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { @@ -921,7 +900,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar upadateIndex(tx, command, location); } }); - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -929,8 +908,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { if (command.hasTransactionInfo()) { - List inflightTx = getInflightTx(command.getTransactionInfo(), location); - inflightTx.add(new RemoveOpperation(command, location)); + List inflightTx = getInflightTx(command.getTransactionInfo(), location); + inflightTx.add(new RemoveOpperation(command, location)); } else { this.indexLock.writeLock().lock(); try { @@ -939,7 +918,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar updateIndex(tx, command, location); } }); - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -954,7 +933,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar updateIndex(tx, command, location); } }); - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -967,7 +946,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar updateIndex(tx, command, location); } }); - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -995,7 +974,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } }); - }finally { + } finally { this.indexLock.writeLock().unlock(); } } @@ -1104,6 +1083,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } Map> ackMessageFileMap = new HashMap>(); + private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { Set referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); if (referenceFileIds == null) { @@ -1156,9 +1136,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // If set then we are creating it.. otherwise we are destroying the sub if (command.hasSubscriptionInfo()) { sd.subscriptions.put(tx, subscriptionKey, command); - long ackLocation=NOT_ACKED; + long ackLocation = NOT_ACKED; if (!command.getRetroactive()) { - ackLocation = sd.orderIndex.nextMessageId-1; + ackLocation = sd.orderIndex.nextMessageId - 1; } else { addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey); } @@ -1175,19 +1155,19 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar * @param tx * @throws IOException */ - void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { + void checkpointUpdate(Transaction tx, Journal journal, boolean cleanup) throws IOException { LOG.debug("Checkpoint started."); // reflect last update exclusive of current checkpoint Location firstTxLocation = metadata.lastUpdate; metadata.state = OPEN_STATE; - metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); + metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(journal); metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); - if( cleanup ) { + if (cleanup) { final TreeSet completeFileSet = new TreeSet(journal.getFileMap().keySet()); final TreeSet gcCandidateSet = new TreeSet(completeFileSet); @@ -1195,21 +1175,22 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet); // Don't GC files under replication - if( journalFilesBeingReplicated!=null ) { + if (journalFilesBeingReplicated != null) { gcCandidateSet.removeAll(journalFilesBeingReplicated); } // Don't GC files after the first in progress tx - if( metadata.firstInProgressTransactionLocation!=null ) { + if (metadata.firstInProgressTransactionLocation != null) { if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) { - firstTxLocation = metadata.firstInProgressTransactionLocation; - }; + firstTxLocation = metadata.firstInProgressTransactionLocation; + } + ; } - if( firstTxLocation!=null ) { - while( !gcCandidateSet.isEmpty() ) { + if (firstTxLocation != null) { + while (!gcCandidateSet.isEmpty()) { Integer last = gcCandidateSet.last(); - if( last >= firstTxLocation.getDataFileId() ) { + if (last >= firstTxLocation.getDataFileId()) { gcCandidateSet.remove(last); } else { break; @@ -1220,32 +1201,33 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // Go through all the destinations to see if any of them can remove GC candidates. for (Entry entry : storedDestinations.entrySet()) { - if( gcCandidateSet.isEmpty() ) { + if (gcCandidateSet.isEmpty()) { break; } // Use a visitor to cut down the number of pages that we load entry.getValue().locationIndex.visit(tx, new BTreeVisitor() { - int last=-1; + int last = -1; + public boolean isInterestedInKeysBetween(Location first, Location second) { - if( first==null ) { - SortedSet subset = gcCandidateSet.headSet(second.getDataFileId()+1); - if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + if (first == null) { + SortedSet subset = gcCandidateSet.headSet(second.getDataFileId() + 1); + if (!subset.isEmpty() && subset.last() == second.getDataFileId()) { subset.remove(second.getDataFileId()); } return !subset.isEmpty(); - } else if( second==null ) { + } else if (second == null) { SortedSet subset = gcCandidateSet.tailSet(first.getDataFileId()); - if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + if (!subset.isEmpty() && subset.first() == first.getDataFileId()) { subset.remove(first.getDataFileId()); } return !subset.isEmpty(); } else { - SortedSet subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); - if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + SortedSet subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId() + 1); + if (!subset.isEmpty() && subset.first() == first.getDataFileId()) { subset.remove(first.getDataFileId()); } - if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + if (!subset.isEmpty() && subset.last() == second.getDataFileId()) { subset.remove(second.getDataFileId()); } return !subset.isEmpty(); @@ -1255,7 +1237,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public void visit(List keys, List values) { for (Location l : keys) { int fileId = l.getDataFileId(); - if( last != fileId ) { + if (last != fileId) { gcCandidateSet.remove(fileId); last = fileId; } @@ -1289,8 +1271,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } - if( !gcCandidateSet.isEmpty() ) { - LOG.debug("Cleanup removing the data files: "+gcCandidateSet); + if (!gcCandidateSet.isEmpty()) { + LOG.debug("Cleanup removing the data files: " + gcCandidateSet); journal.removeDataFiles(gcCandidateSet); } } @@ -1298,13 +1280,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar LOG.debug("Checkpoint done."); } - private Location checkpointProducerAudit() throws IOException { + private Location checkpointProducerAudit(Journal journal) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oout = new ObjectOutputStream(baos); oout.writeObject(metadata.producerSequenceIdTracker); oout.flush(); oout.close(); - return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null); + return store(journal, new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null); } public HashSet getJournalFilesBeingReplicated() { @@ -1330,13 +1312,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar final Location location; public MessageKeys(String messageId, Location location) { - this.messageId=messageId; - this.location=location; + this.messageId = messageId; + this.location = location; } @Override public String toString() { - return "["+messageId+","+location+"]"; + return "[" + messageId + "," + location + "]"; } } @@ -1452,20 +1434,20 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); } else { - // upgrade - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); - value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); - value.orderIndex.lowPriorityIndex.load(tx); + // upgrade + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.load(tx); - value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); - value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); - value.orderIndex.highPriorityIndex.load(tx); - } - }); + value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.load(tx); + } + }); } return value; @@ -1493,12 +1475,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); - rc.mergeFramed((InputStream)dataIn); + rc.mergeFramed((InputStream) dataIn); return rc; } public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { - object.writeFramed((OutputStream)dataOut); + object.writeFramed((OutputStream) dataOut); } } @@ -1588,7 +1570,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { Entry entry = iterator.next(); for (Iterator> orderIterator = - rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { + rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { Long sequence = orderIterator.next().getKey(); addAckLocation(tx, rc, sequence, entry.getKey()); } @@ -1600,18 +1582,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (rc.orderIndex.nextMessageId == 0) { // check for existing durable sub all acked out - pull next seq from acks as messages are gone if (!rc.subscriptionAcks.isEmpty(tx)) { - for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { + for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { Entry entry = iterator.next(); rc.orderIndex.nextMessageId = - Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); + Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence + 1); } } } else { // update based on ackPositions for unmatched, last entry is always the next if (!rc.ackPositions.isEmpty(tx)) { - Entry> last = rc.ackPositions.getLast(tx); + Entry> last = rc.ackPositions.getLast(tx); rc.orderIndex.nextMessageId = - Math.max(rc.orderIndex.nextMessageId, last.getKey()); + Math.max(rc.orderIndex.nextMessageId, last.getKey()); } } @@ -1644,16 +1626,17 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } final HashSet nextMessageIdMarker = new HashSet(); + // on a new message add, all existing subs are interested in this message private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { HashSet hs = new HashSet(); - for (Iterator> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) { + for (Iterator> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { Entry entry = iterator.next(); hs.add(entry.getKey()); } sd.ackPositions.put(tx, messageSequence, hs); // add empty next to keep track of nextMessage - sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker); + sd.ackPositions.put(tx, messageSequence + 1, nextMessageIdMarker); } private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { @@ -1830,15 +1813,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return index; } - private Journal createJournal() throws IOException { - Journal manager = new Journal(); + private JournalManager createJournalManager() throws IOException { + JournalManager manager = isJournalPerDestination() ? new DestinationJournalManager() : new DefaultJournalManager(); manager.setDirectory(directory); manager.setMaxFileLength(getJournalMaxFileLength()); manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); manager.setArchiveDataLogs(isArchiveDataLogs()); - manager.setSizeAccumulator(storeSize); + manager.setStoreSize(storeSize); if (getDirectoryArchive() != null) { IOHelper.mkdirs(getDirectoryArchive()); manager.setDirectoryArchive(getDirectoryArchive()); @@ -1941,11 +1924,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return pageFile; } - public Journal getJournal() throws IOException { - if (journal == null) { - journal = createJournal(); + public JournalManager getJournalManager() throws IOException { + if (journalManager == null) { + journalManager = createJournalManager(); } - return journal; + return journalManager; + } + + public Journal getJournal(ActiveMQDestination destination) throws IOException { + return getJournalManager().getJournal(destination); } public boolean isFailIfDatabaseIsLocked() { @@ -2034,6 +2021,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar this.databaseLockedWaitDelay = databaseLockedWaitDelay; } + public boolean isJournalPerDestination() { + return journalPerDestination; + } + + public void setJournalPerDestination(boolean journalPerDestination) { + this.journalPerDestination = journalPerDestination; + } + // ///////////////////////////////////////////////////////////////// // Internal conversion methods. // ///////////////////////////////////////////////////////////////// @@ -2061,23 +2056,24 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return rc; } - class MessageOrderCursor{ + class MessageOrderCursor { long defaultCursorPosition; long lowPriorityCursorPosition; long highPriorityCursorPosition; - MessageOrderCursor(){ + + MessageOrderCursor() { } - MessageOrderCursor(long position){ - this.defaultCursorPosition=position; - this.lowPriorityCursorPosition=position; - this.highPriorityCursorPosition=position; + MessageOrderCursor(long position) { + this.defaultCursorPosition = position; + this.lowPriorityCursorPosition = position; + this.highPriorityCursorPosition = position; } - MessageOrderCursor(MessageOrderCursor other){ - this.defaultCursorPosition=other.defaultCursorPosition; - this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; - this.highPriorityCursorPosition=other.highPriorityCursorPosition; + MessageOrderCursor(MessageOrderCursor other) { + this.defaultCursorPosition = other.defaultCursorPosition; + this.lowPriorityCursorPosition = other.lowPriorityCursorPosition; + this.highPriorityCursorPosition = other.highPriorityCursorPosition; } MessageOrderCursor copy() { @@ -2085,33 +2081,33 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } void reset() { - this.defaultCursorPosition=0; - this.highPriorityCursorPosition=0; - this.lowPriorityCursorPosition=0; + this.defaultCursorPosition = 0; + this.highPriorityCursorPosition = 0; + this.lowPriorityCursorPosition = 0; } void increment() { - if (defaultCursorPosition!=0) { + if (defaultCursorPosition != 0) { defaultCursorPosition++; } - if (highPriorityCursorPosition!=0) { + if (highPriorityCursorPosition != 0) { highPriorityCursorPosition++; } - if (lowPriorityCursorPosition!=0) { + if (lowPriorityCursorPosition != 0) { lowPriorityCursorPosition++; } } public String toString() { - return "MessageOrderCursor:[def:" + defaultCursorPosition - + ", low:" + lowPriorityCursorPosition - + ", high:" + highPriorityCursorPosition + "]"; + return "MessageOrderCursor:[def:" + defaultCursorPosition + + ", low:" + lowPriorityCursorPosition + + ", high:" + highPriorityCursorPosition + "]"; } public void sync(MessageOrderCursor other) { - this.defaultCursorPosition=other.defaultCursorPosition; - this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; - this.highPriorityCursorPosition=other.highPriorityCursorPosition; + this.defaultCursorPosition = other.defaultCursorPosition; + this.lowPriorityCursorPosition = other.lowPriorityCursorPosition; + this.highPriorityCursorPosition = other.highPriorityCursorPosition; } } @@ -2132,9 +2128,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar MessageKeys remove(Transaction tx, Long key) throws IOException { MessageKeys result = defaultPriorityIndex.remove(tx, key); - if (result == null && highPriorityIndex!=null) { + if (result == null && highPriorityIndex != null) { result = highPriorityIndex.remove(tx, key); - if (result ==null && lowPriorityIndex!=null) { + if (result == null && lowPriorityIndex != null) { result = lowPriorityIndex.remove(tx, key); } } @@ -2256,14 +2252,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } void stoppedIterating() { - if (lastDefaultKey!=null) { - cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; + if (lastDefaultKey != null) { + cursor.defaultCursorPosition = lastDefaultKey.longValue() + 1; } - if (lastHighKey!=null) { - cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; + if (lastHighKey != null) { + cursor.highPriorityCursorPosition = lastHighKey.longValue() + 1; } - if (lastLowKey!=null) { - cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; + if (lastLowKey != null) { + cursor.lowPriorityCursorPosition = lastLowKey.longValue() + 1; } lastDefaultKey = null; lastHighKey = null; @@ -2282,7 +2278,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } void getDeleteList(Transaction tx, ArrayList> deletes, - BTreeIndex index, Long sequenceId) throws IOException { + BTreeIndex index, Long sequenceId) throws IOException { Iterator> iterator = index.iterator(tx, sequenceId); deletes.add(iterator.next()); @@ -2318,24 +2314,23 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } - Iterator> iterator(Transaction tx) throws IOException{ - return new MessageOrderIterator(tx,cursor); + Iterator> iterator(Transaction tx) throws IOException { + return new MessageOrderIterator(tx, cursor); } - Iterator> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ - return new MessageOrderIterator(tx,m); + Iterator> iterator(Transaction tx, MessageOrderCursor m) throws IOException { + return new MessageOrderIterator(tx, m); } public byte lastGetPriority() { return lastGetPriority; } - class MessageOrderIterator implements Iterator>{ - Iterator>currentIterator; - final Iterator>highIterator; - final Iterator>defaultIterator; - final Iterator>lowIterator; - + class MessageOrderIterator implements Iterator> { + Iterator> currentIterator; + final Iterator> highIterator; + final Iterator> defaultIterator; + final Iterator> lowIterator; MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java index 661b25d045..1b7faedb94 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java @@ -27,8 +27,6 @@ import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.util.DefaultIOExceptionHandler; import org.junit.After; import org.junit.Test; - - import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -58,7 +56,7 @@ public class AMQ2736Test { // test hack, close the journal to ensure no further journal updates when broker stops // mimic kill -9 in terms of no normal shutdown sequence - store.getJournal().close(); + store.getJournalManager().close(); try { store.close(); } catch (Exception expectedLotsAsJournalBorked) { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java index d5220c8dd0..861bb3a324 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java @@ -16,11 +16,7 @@ */ package org.apache.activemq.bugs; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - import java.io.IOException; -import java.net.URISyntaxException; import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; @@ -32,7 +28,6 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.broker.BrokerService; @@ -42,6 +37,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class AMQ2982Test { @@ -65,7 +62,7 @@ public class AMQ2982Test { // ensure save memory publishing, use the right lock indexLock.readLock().lock(); try { - return getJournal().getFileMap().size(); + return getJournalManager().getFileMap().size(); } finally { indexLock.readLock().unlock(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java index f8b941a40c..cbb7752480 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java @@ -16,10 +16,6 @@ */ package org.apache.activemq.bugs; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -34,7 +30,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.kahadb.KahaDBStore; @@ -42,6 +37,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; public class AMQ2983Test { @@ -67,7 +63,7 @@ public class AMQ2983Test { // ensure save memory publishing, use the right lock indexLock.readLock().lock(); try { - return getJournal().getFileMap().size(); + return getJournalManager().getFileMap().size(); } finally { indexLock.readLock().unlock(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java index 08a0fba48e..e7becf88fd 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java @@ -31,9 +31,9 @@ public class SimpleDurableTopicTest extends SimpleTopicTest { protected long initialConsumerDelay = 0; @Override protected void setUp() throws Exception { - numberOfDestinations=1; + numberOfDestinations=10; numberOfConsumers = 1; - numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "20"), 20); + numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "1")); sampleCount= Integer.parseInt(System.getProperty("SimpleDurableTopicTest.sampleCount", "1000"), 10); playloadSize = 1024; super.setUp(); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 9d685b44b6..2b10fb0cc5 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.usecases; import java.util.Vector; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -66,7 +67,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public static Test suite() { return suite(DurableSubscriptionOfflineTest.class); } - + protected void setUp() throws Exception { exceptions.clear(); topic = (ActiveMQTopic) createDestination(); @@ -82,9 +83,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp private void createBroker() throws Exception { createBroker(true); } - + private void createBroker(boolean deleteAllMessages) throws Exception { - broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")"); + broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")"); broker.setBrokerName(getName(true)); broker.setDeleteAllMessagesOnStartup(deleteAllMessages); broker.getManagementContext().setCreateConnector(false); @@ -96,14 +97,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp policyMap.setDefaultEntry(policy); broker.setDestinationPolicy(policyMap); } - + setDefaultPersistenceAdapter(broker); if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { // ensure it kicks in during tests - ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000); + ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).setCleanupPeriod(2 * 1000); } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { // have lots of journal files - ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength); + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength); } broker.start(); } @@ -115,9 +116,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); this.addCombinationValues("usePrioritySupport", - new Object[]{ Boolean.TRUE, Boolean.FALSE}); + new Object[]{Boolean.TRUE, Boolean.FALSE}); } public void testConsumeOnlyMatchedMessages() throws Exception { @@ -162,110 +163,110 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(sent, listener.count); } - public void testConsumeAllMatchedMessages() throws Exception { - // create durable subscription - Connection con = createConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); + public void testConsumeAllMatchedMessages() throws Exception { + // create durable subscription + Connection con = createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "true"); - producer.send(topic, message); - } + int sent = 0; + for (int i = 0; i < 10; i++) { + sent++; + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + } - Thread.sleep(1 * 1000); + Thread.sleep(1 * 1000); - session.close(); - con.close(); + session.close(); + con.close(); - // consume messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); - consumer.setMessageListener(listener); + // consume messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); - Thread.sleep(3 * 1000); + Thread.sleep(3 * 1000); - session.close(); - con.close(); + session.close(); + con.close(); - assertEquals(sent, listener.count); - } - - - public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); - this.addCombinationValues("usePrioritySupport", - new Object[]{ Boolean.TRUE, Boolean.FALSE}); + assertEquals(sent, listener.count); } - public void testVerifyAllConsumedAreAcked() throws Exception { - // create durable subscription - Connection con = createConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); + public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception { + this.addCombinationValues("defaultPersistenceAdapter", + new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + this.addCombinationValues("usePrioritySupport", + new Object[]{Boolean.TRUE, Boolean.FALSE}); + } - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "true"); - producer.send(topic, message); - } + public void testVerifyAllConsumedAreAcked() throws Exception { + // create durable subscription + Connection con = createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); - Thread.sleep(1 * 1000); + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); - session.close(); - con.close(); + int sent = 0; + for (int i = 0; i < 10; i++) { + sent++; + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + } - // consume messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); - consumer.setMessageListener(listener); + Thread.sleep(1 * 1000); - Thread.sleep(3 * 1000); + session.close(); + con.close(); - session.close(); - con.close(); + // consume messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); - LOG.info("Consumed: " + listener.count); - assertEquals(sent, listener.count); + Thread.sleep(3 * 1000); - // consume messages again, should not get any - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - listener = new Listener(); - consumer.setMessageListener(listener); + session.close(); + con.close(); - Thread.sleep(3 * 1000); + LOG.info("Consumed: " + listener.count); + assertEquals(sent, listener.count); - session.close(); - con.close(); + // consume messages again, should not get any + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + listener = new Listener(); + consumer.setMessageListener(listener); - assertEquals(0, listener.count); - } + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + assertEquals(0, listener.count); + } public void testTwoOfflineSubscriptionCanConsume() throws Exception { // create durable subscription 1 @@ -323,9 +324,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); this.addCombinationValues("usePrioritySupport", - new Object[]{ Boolean.TRUE, Boolean.FALSE}); + new Object[]{Boolean.TRUE, Boolean.FALSE}); } public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { @@ -474,14 +475,15 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con.close(); assertEquals("offline consumer got all", sent, listener.count); - } + } public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); } private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))"; + public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { // create offline subs 1 Connection con = createConnection("offCli1"); @@ -629,9 +631,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); } - + public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception { // create offline subs 1 Connection con = createConnection("offCli1"); @@ -672,7 +674,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp Thread.sleep(3 * 1000); broker.stop(); createBroker(false /*deleteAllMessages*/); - + // send more messages con = createConnection(); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -719,7 +721,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public void initCombosForTestOfflineAfterRestart() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); } public void testOfflineSubscriptionAfterRestart() throws Exception { @@ -855,7 +857,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp int filtered = 0; for (int i = 0; i < 10; i++) { - boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1; + boolean filter = (i % 2 == 0); //(int) (Math.random() * 2) >= 1; if (filter) filtered++; @@ -953,7 +955,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp sent = 0; for (int i = 0; i < 2; i++) { Message message = session.createMessage(); - message.setStringProperty("filter", i==1 ? "true" : "false"); + message.setStringProperty("filter", i == 1 ? "true" : "false"); producer.send(topic, message); sent++; } @@ -961,7 +963,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp Thread.sleep(1 * 1000); session.close(); con.close(); - + LOG.info("cli1 again, should get 1 new ones"); con = createConnection("cli1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1059,7 +1061,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp // use very small journal to get lots of files to cleanup public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception { this.addCombinationValues("journalMaxFileLength", - new Object[]{new Integer(64*1024)}); + new Object[]{new Integer(64 * 1024)}); } // https://issues.apache.org/jira/browse/AMQ-3206 @@ -1081,7 +1083,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp MessageProducer producer = session.createProducer(null); final int toSend = 500; - final String payload = new byte[40*1024].toString(); + final String payload = new byte[40 * 1024].toString(); int sent = 0; for (int i = sent; i < toSend; i++) { Message message = session.createTextMessage(payload); @@ -1108,7 +1110,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp consumer.setMessageListener(listener); assertTrue("got all sent", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - LOG.info("Want: " + toSend + ", current: " + listener.count); + LOG.info("Want: " + toSend + ", current: " + listener.count); return listener.count == toSend; } })); @@ -1118,7 +1120,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp destroyBroker(); createBroker(false); KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size()); + assertEquals("only one journal file left after restart", 1, pa.getStore().getJournalManager().getFileMap().size()); } public static class Listener implements MessageListener { @@ -1127,20 +1129,23 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp Listener() { } + Listener(String id) { this.id = id; } + public void onMessage(Message message) { count++; if (id != null) { try { LOG.info(id + ", " + message.getJMSMessageID()); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } } } } - public class FilterCheckListener extends Listener { + public class FilterCheckListener extends Listener { public void onMessage(Message message) { count++; @@ -1150,13 +1155,11 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp if (b != null) { boolean c = message.getBooleanProperty("$c"); assertTrue("", c); - } - else { + } else { String d = message.getStringProperty("$d"); assertTrue("", "D1".equals(d) || "D2".equals(d)); } - } - catch (JMSException e) { + } catch (JMSException e) { exceptions.add(e); } }