mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1147149 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
01bc7fa9c5
commit
8e61f519df
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.kahadb.journal.DataFile;
|
||||
import org.apache.kahadb.journal.Journal;
|
||||
|
||||
|
||||
public class DefaultJournalManager implements JournalManager {
|
||||
|
||||
private final Journal journal;
|
||||
private final List<Journal> journals;
|
||||
|
||||
public DefaultJournalManager() {
|
||||
this.journal = new Journal();
|
||||
List<Journal> list = new ArrayList<Journal>(1);
|
||||
list.add(this.journal);
|
||||
this.journals = Collections.unmodifiableList(list);
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
journal.start();
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
journal.close();
|
||||
}
|
||||
|
||||
public Journal getJournal(ActiveMQDestination destination) {
|
||||
return journal;
|
||||
}
|
||||
|
||||
public void setDirectory(File directory) {
|
||||
journal.setDirectory(directory);
|
||||
}
|
||||
|
||||
public void setMaxFileLength(int maxFileLength) {
|
||||
journal.setMaxFileLength(maxFileLength);
|
||||
}
|
||||
|
||||
public void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles) {
|
||||
journal.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
|
||||
}
|
||||
|
||||
public void setChecksum(boolean checksum) {
|
||||
journal.setChecksum(checksum);
|
||||
}
|
||||
|
||||
public void setWriteBatchSize(int batchSize) {
|
||||
journal.setWriteBatchSize(batchSize);
|
||||
}
|
||||
|
||||
public void setArchiveDataLogs(boolean archiveDataLogs) {
|
||||
journal.setArchiveDataLogs(archiveDataLogs);
|
||||
}
|
||||
|
||||
public void setStoreSize(AtomicLong storeSize) {
|
||||
journal.setSizeAccumulator(storeSize);
|
||||
}
|
||||
|
||||
public void setDirectoryArchive(File directoryArchive) {
|
||||
journal.setDirectoryArchive(directoryArchive);
|
||||
}
|
||||
|
||||
public void delete() throws IOException {
|
||||
journal.delete();
|
||||
}
|
||||
|
||||
public Map<Integer, DataFile> getFileMap() {
|
||||
return journal.getFileMap();
|
||||
}
|
||||
|
||||
public Collection<Journal> getJournals() {
|
||||
return journals;
|
||||
}
|
||||
|
||||
public Collection<Journal> getJournals(Set<ActiveMQDestination> set) {
|
||||
return journals;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,239 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.kahadb.journal.DataFile;
|
||||
import org.apache.kahadb.journal.Journal;
|
||||
|
||||
public class DestinationJournalManager implements JournalManager {
|
||||
private static final String PREPEND = "JournalDest-";
|
||||
private static final String QUEUE_PREPEND = PREPEND + "Queue-";
|
||||
private static final String TOPIC_PREPEND = PREPEND + "Topic-";
|
||||
private AtomicBoolean started = new AtomicBoolean();
|
||||
private final Map<ActiveMQDestination, Journal> journalMap = new ConcurrentHashMap<ActiveMQDestination, Journal>();
|
||||
private File directory = new File("KahaDB");
|
||||
private File directoryArchive;
|
||||
private int maxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
|
||||
private boolean checkForCorruptionOnStartup;
|
||||
private boolean checksum = false;
|
||||
private int writeBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||
private boolean archiveDataLogs;
|
||||
private AtomicLong storeSize = new AtomicLong(0);
|
||||
|
||||
|
||||
public AtomicBoolean getStarted() {
|
||||
return started;
|
||||
}
|
||||
|
||||
public void setStarted(AtomicBoolean started) {
|
||||
this.started = started;
|
||||
}
|
||||
|
||||
public File getDirectory() {
|
||||
return directory;
|
||||
}
|
||||
|
||||
public void setDirectory(File directory) {
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
public File getDirectoryArchive() {
|
||||
return directoryArchive;
|
||||
}
|
||||
|
||||
public void setDirectoryArchive(File directoryArchive) {
|
||||
this.directoryArchive = directoryArchive;
|
||||
}
|
||||
|
||||
public int getMaxFileLength() {
|
||||
return maxFileLength;
|
||||
}
|
||||
|
||||
public void setMaxFileLength(int maxFileLength) {
|
||||
this.maxFileLength = maxFileLength;
|
||||
}
|
||||
|
||||
public boolean isCheckForCorruptionOnStartup() {
|
||||
return checkForCorruptionOnStartup;
|
||||
}
|
||||
|
||||
public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
|
||||
this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
|
||||
}
|
||||
|
||||
public boolean isChecksum() {
|
||||
return checksum;
|
||||
}
|
||||
|
||||
public void setChecksum(boolean checksum) {
|
||||
this.checksum = checksum;
|
||||
}
|
||||
|
||||
public int getWriteBatchSize() {
|
||||
return writeBatchSize;
|
||||
}
|
||||
|
||||
public void setWriteBatchSize(int writeBatchSize) {
|
||||
this.writeBatchSize = writeBatchSize;
|
||||
}
|
||||
|
||||
public boolean isArchiveDataLogs() {
|
||||
return archiveDataLogs;
|
||||
}
|
||||
|
||||
public void setArchiveDataLogs(boolean archiveDataLogs) {
|
||||
this.archiveDataLogs = archiveDataLogs;
|
||||
}
|
||||
|
||||
public AtomicLong getStoreSize() {
|
||||
return storeSize;
|
||||
}
|
||||
|
||||
public void setStoreSize(AtomicLong storeSize) {
|
||||
this.storeSize = storeSize;
|
||||
}
|
||||
|
||||
|
||||
public void start() throws IOException {
|
||||
if (started.compareAndSet(false, true)) {
|
||||
File[] files = getDirectory().listFiles(new FilenameFilter() {
|
||||
public boolean accept(File file, String s) {
|
||||
if (file.isDirectory() && s != null && s.startsWith(PREPEND)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
if (files != null) {
|
||||
for (File file : files) {
|
||||
ActiveMQDestination destination;
|
||||
if (file.getName().startsWith(TOPIC_PREPEND)) {
|
||||
String destinationName = file.getName().substring(TOPIC_PREPEND.length());
|
||||
destination = new ActiveMQTopic(destinationName);
|
||||
} else {
|
||||
String destinationName = file.getName().substring(QUEUE_PREPEND.length());
|
||||
destination = new ActiveMQQueue(destinationName);
|
||||
}
|
||||
|
||||
Journal journal = new Journal();
|
||||
journal.setDirectory(file);
|
||||
if (getDirectoryArchive() != null) {
|
||||
IOHelper.mkdirs(getDirectoryArchive());
|
||||
File archive = new File(getDirectoryArchive(), file.getName());
|
||||
IOHelper.mkdirs(archive);
|
||||
journal.setDirectoryArchive(archive);
|
||||
}
|
||||
configure(journal);
|
||||
journalMap.put(destination, journal);
|
||||
}
|
||||
}
|
||||
for (Journal journal : journalMap.values()) {
|
||||
journal.start();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
started.set(false);
|
||||
for (Journal journal : journalMap.values()) {
|
||||
journal.close();
|
||||
}
|
||||
journalMap.clear();
|
||||
}
|
||||
|
||||
|
||||
public void delete() throws IOException {
|
||||
for (Journal journal : journalMap.values()) {
|
||||
journal.delete();
|
||||
}
|
||||
journalMap.clear();
|
||||
}
|
||||
|
||||
public Journal getJournal(ActiveMQDestination destination) throws IOException {
|
||||
Journal journal = journalMap.get(destination);
|
||||
if (journal == null && !destination.isTemporary()) {
|
||||
journal = new Journal();
|
||||
String fileName;
|
||||
if (destination.isTopic()) {
|
||||
fileName = TOPIC_PREPEND + destination.getPhysicalName();
|
||||
} else {
|
||||
fileName = QUEUE_PREPEND + destination.getPhysicalName();
|
||||
}
|
||||
File file = new File(getDirectory(), fileName);
|
||||
IOHelper.mkdirs(file);
|
||||
journal.setDirectory(file);
|
||||
if (getDirectoryArchive() != null) {
|
||||
IOHelper.mkdirs(getDirectoryArchive());
|
||||
File archive = new File(getDirectoryArchive(), fileName);
|
||||
IOHelper.mkdirs(archive);
|
||||
journal.setDirectoryArchive(archive);
|
||||
}
|
||||
configure(journal);
|
||||
if (started.get()) {
|
||||
journal.start();
|
||||
}
|
||||
return journal;
|
||||
} else {
|
||||
return journal;
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Integer, DataFile> getFileMap() {
|
||||
throw new RuntimeException("Not supported");
|
||||
}
|
||||
|
||||
public Collection<Journal> getJournals() {
|
||||
return journalMap.values();
|
||||
}
|
||||
|
||||
public Collection<Journal> getJournals(Set<ActiveMQDestination> set) {
|
||||
List<Journal> list = new ArrayList<Journal>();
|
||||
for (ActiveMQDestination destination : set) {
|
||||
Journal j = journalMap.get(destination);
|
||||
if (j != null) {
|
||||
list.add(j);
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
protected void configure(Journal journal) {
|
||||
journal.setMaxFileLength(getMaxFileLength());
|
||||
journal.setCheckForCorruptionOnStartup(isCheckForCorruptionOnStartup());
|
||||
journal.setChecksum(isChecksum() || isCheckForCorruptionOnStartup());
|
||||
journal.setWriteBatchSize(getWriteBatchSize());
|
||||
journal.setArchiveDataLogs(isArchiveDataLogs());
|
||||
journal.setSizeAccumulator(getStoreSize());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.kahadb.journal.DataFile;
|
||||
import org.apache.kahadb.journal.Journal;
|
||||
|
||||
public interface JournalManager {
|
||||
|
||||
void start() throws IOException;
|
||||
|
||||
void close() throws IOException;
|
||||
|
||||
Journal getJournal(ActiveMQDestination destination) throws IOException;
|
||||
|
||||
void setDirectory(File directory);
|
||||
|
||||
void setMaxFileLength(int maxFileLength);
|
||||
|
||||
void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles);
|
||||
|
||||
void setChecksum(boolean checksum);
|
||||
|
||||
void setWriteBatchSize(int batchSize);
|
||||
|
||||
void setArchiveDataLogs(boolean archiveDataLogs);
|
||||
|
||||
void setStoreSize(AtomicLong storeSize);
|
||||
|
||||
void setDirectoryArchive(File directoryArchive);
|
||||
|
||||
void delete() throws IOException;
|
||||
|
||||
Map<Integer, DataFile> getFileMap();
|
||||
|
||||
Collection<Journal> getJournals();
|
||||
|
||||
Collection<Journal> getJournals(Set<ActiveMQDestination> set);
|
||||
}
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activeio.journal.Journal;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerServiceAware;
|
||||
|
@ -37,16 +38,11 @@ import org.apache.activemq.usage.SystemUsage;
|
|||
* An implementation of {@link PersistenceAdapter} designed for use with a
|
||||
* {@link Journal} and then check pointing asynchronously on a timeout with some
|
||||
* other long term persistent storage.
|
||||
*
|
||||
* @org.apache.xbean.XBean element="kahaDB"
|
||||
*
|
||||
*/
|
||||
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
|
||||
private final KahaDBStore letter = new KahaDBStore();
|
||||
|
||||
/**
|
||||
* @param context
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
|
||||
*/
|
||||
public void beginTransaction(ConnectionContext context) throws IOException {
|
||||
|
@ -54,8 +50,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param sync
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
|
||||
*/
|
||||
public void checkpoint(boolean sync) throws IOException {
|
||||
|
@ -63,8 +57,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param context
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
|
||||
*/
|
||||
public void commitTransaction(ConnectionContext context) throws IOException {
|
||||
|
@ -72,9 +64,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param destination
|
||||
* @return MessageStore
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
|
||||
*/
|
||||
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
|
||||
|
@ -82,9 +72,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param destination
|
||||
* @return TopicMessageStore
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
|
||||
*/
|
||||
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
|
||||
|
@ -93,7 +81,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* @return TrandactionStore
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
|
||||
*/
|
||||
public TransactionStore createTransactionStore() throws IOException {
|
||||
|
@ -101,7 +88,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
|
||||
*/
|
||||
public void deleteAllMessages() throws IOException {
|
||||
|
@ -118,7 +104,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* @return lastMessageBrokerSequenceId
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
|
||||
*/
|
||||
public long getLastMessageBrokerSequenceId() throws IOException {
|
||||
|
@ -130,7 +115,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param destination
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
|
||||
*/
|
||||
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||
|
@ -138,7 +122,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param destination
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
|
||||
*/
|
||||
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||
|
@ -146,8 +129,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param context
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
|
||||
*/
|
||||
public void rollbackTransaction(ConnectionContext context) throws IOException {
|
||||
|
@ -155,7 +136,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param brokerName
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
|
||||
*/
|
||||
public void setBrokerName(String brokerName) {
|
||||
|
@ -163,7 +143,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param usageManager
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
|
||||
*/
|
||||
public void setUsageManager(SystemUsage usageManager) {
|
||||
|
@ -179,7 +158,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
* @see org.apache.activemq.Service#start()
|
||||
*/
|
||||
public void start() throws Exception {
|
||||
|
@ -187,7 +165,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
* @see org.apache.activemq.Service#stop()
|
||||
*/
|
||||
public void stop() throws Exception {
|
||||
|
@ -196,7 +173,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the journalMaxFileLength
|
||||
*
|
||||
*
|
||||
* @return the journalMaxFileLength
|
||||
*/
|
||||
public int getJournalMaxFileLength() {
|
||||
|
@ -206,8 +183,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
/**
|
||||
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
|
||||
* be used
|
||||
*
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
|
||||
*/
|
||||
public void setJournalMaxFileLength(int journalMaxFileLength) {
|
||||
this.letter.setJournalMaxFileLength(journalMaxFileLength);
|
||||
|
@ -219,7 +194,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
|
||||
this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
|
||||
}
|
||||
|
||||
|
||||
public int getMaxFailoverProducersToTrack() {
|
||||
return this.letter.getMaxFailoverProducersToTrack();
|
||||
}
|
||||
|
@ -231,14 +206,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
|
||||
this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
|
||||
}
|
||||
|
||||
|
||||
public int getFailoverProducersAuditDepth() {
|
||||
return this.getFailoverProducersAuditDepth();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the checkpointInterval
|
||||
*
|
||||
*
|
||||
* @return the checkpointInterval
|
||||
*/
|
||||
public long getCheckpointInterval() {
|
||||
|
@ -247,9 +222,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Set the checkpointInterval
|
||||
*
|
||||
* @param checkpointInterval
|
||||
* the checkpointInterval to set
|
||||
*
|
||||
* @param checkpointInterval the checkpointInterval to set
|
||||
*/
|
||||
public void setCheckpointInterval(long checkpointInterval) {
|
||||
this.letter.setCheckpointInterval(checkpointInterval);
|
||||
|
@ -257,7 +231,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the cleanupInterval
|
||||
*
|
||||
*
|
||||
* @return the cleanupInterval
|
||||
*/
|
||||
public long getCleanupInterval() {
|
||||
|
@ -266,9 +240,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Set the cleanupInterval
|
||||
*
|
||||
* @param cleanupInterval
|
||||
* the cleanupInterval to set
|
||||
*
|
||||
* @param cleanupInterval the cleanupInterval to set
|
||||
*/
|
||||
public void setCleanupInterval(long cleanupInterval) {
|
||||
this.letter.setCleanupInterval(cleanupInterval);
|
||||
|
@ -276,7 +249,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the indexWriteBatchSize
|
||||
*
|
||||
*
|
||||
* @return the indexWriteBatchSize
|
||||
*/
|
||||
public int getIndexWriteBatchSize() {
|
||||
|
@ -286,9 +259,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
/**
|
||||
* Set the indexWriteBatchSize
|
||||
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
|
||||
* @param indexWriteBatchSize
|
||||
* the indexWriteBatchSize to set
|
||||
*
|
||||
* @param indexWriteBatchSize the indexWriteBatchSize to set
|
||||
*/
|
||||
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
|
||||
this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
|
||||
|
@ -296,7 +268,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the journalMaxWriteBatchSize
|
||||
*
|
||||
*
|
||||
* @return the journalMaxWriteBatchSize
|
||||
*/
|
||||
public int getJournalMaxWriteBatchSize() {
|
||||
|
@ -305,10 +277,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Set the journalMaxWriteBatchSize
|
||||
* * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
|
||||
* @param journalMaxWriteBatchSize
|
||||
* the journalMaxWriteBatchSize to set
|
||||
* * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
|
||||
*
|
||||
* @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
|
||||
*/
|
||||
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
|
||||
this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
|
||||
|
@ -316,7 +287,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the enableIndexWriteAsync
|
||||
*
|
||||
*
|
||||
* @return the enableIndexWriteAsync
|
||||
*/
|
||||
public boolean isEnableIndexWriteAsync() {
|
||||
|
@ -325,9 +296,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Set the enableIndexWriteAsync
|
||||
*
|
||||
* @param enableIndexWriteAsync
|
||||
* the enableIndexWriteAsync to set
|
||||
*
|
||||
* @param enableIndexWriteAsync the enableIndexWriteAsync to set
|
||||
*/
|
||||
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
|
||||
this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
|
||||
|
@ -335,7 +305,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the directory
|
||||
*
|
||||
*
|
||||
* @return the directory
|
||||
*/
|
||||
public File getDirectory() {
|
||||
|
@ -343,7 +313,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param dir
|
||||
* @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
|
||||
*/
|
||||
public void setDirectory(File dir) {
|
||||
|
@ -352,7 +321,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the enableJournalDiskSyncs
|
||||
*
|
||||
*
|
||||
* @return the enableJournalDiskSyncs
|
||||
*/
|
||||
public boolean isEnableJournalDiskSyncs() {
|
||||
|
@ -361,9 +330,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Set the enableJournalDiskSyncs
|
||||
*
|
||||
* @param enableJournalDiskSyncs
|
||||
* the enableJournalDiskSyncs to set
|
||||
*
|
||||
* @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
|
||||
*/
|
||||
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
|
||||
this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
|
||||
|
@ -371,7 +339,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the indexCacheSize
|
||||
*
|
||||
*
|
||||
* @return the indexCacheSize
|
||||
*/
|
||||
public int getIndexCacheSize() {
|
||||
|
@ -381,9 +349,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
/**
|
||||
* Set the indexCacheSize
|
||||
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
|
||||
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
|
||||
* @param indexCacheSize
|
||||
* the indexCacheSize to set
|
||||
*
|
||||
* @param indexCacheSize the indexCacheSize to set
|
||||
*/
|
||||
public void setIndexCacheSize(int indexCacheSize) {
|
||||
this.letter.setIndexCacheSize(indexCacheSize);
|
||||
|
@ -391,7 +358,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Get the ignoreMissingJournalfiles
|
||||
*
|
||||
*
|
||||
* @return the ignoreMissingJournalfiles
|
||||
*/
|
||||
public boolean isIgnoreMissingJournalfiles() {
|
||||
|
@ -400,9 +367,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
|
||||
/**
|
||||
* Set the ignoreMissingJournalfiles
|
||||
*
|
||||
* @param ignoreMissingJournalfiles
|
||||
* the ignoreMissingJournalfiles to set
|
||||
*
|
||||
* @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
|
||||
*/
|
||||
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
|
||||
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
|
||||
|
@ -463,14 +429,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
public int getMaxAsyncJobs() {
|
||||
return letter.getMaxAsyncJobs();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxAsyncJobs
|
||||
* the maxAsyncJobs to set
|
||||
* @param maxAsyncJobs the maxAsyncJobs to set
|
||||
*/
|
||||
public void setMaxAsyncJobs(int maxAsyncJobs) {
|
||||
letter.setMaxAsyncJobs(maxAsyncJobs);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the databaseLockedWaitDelay
|
||||
*/
|
||||
|
@ -482,7 +448,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
* @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
|
||||
*/
|
||||
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
|
||||
letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
|
||||
letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
|
||||
}
|
||||
|
||||
public boolean getForceRecoverIndex() {
|
||||
|
@ -493,6 +459,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
letter.setForceRecoverIndex(forceRecoverIndex);
|
||||
}
|
||||
|
||||
public boolean isJournalPerDestination() {
|
||||
return letter.isJournalPerDestination();
|
||||
}
|
||||
|
||||
public void setJournalPerDestination(boolean journalPerDestination) {
|
||||
letter.setJournalPerDestination(journalPerDestination);
|
||||
}
|
||||
|
||||
// for testing
|
||||
public KahaDBStore getStore() {
|
||||
return letter;
|
||||
|
|
|
@ -26,30 +26,24 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||
import org.apache.activemq.command.ActiveMQTempTopic;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.LocalTransactionId;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.SubscriptionInfo;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.filter.BooleanExpression;
|
||||
import org.apache.activemq.filter.MessageEvaluationContext;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.protobuf.Buffer;
|
||||
import org.apache.activemq.selector.SelectorParser;
|
||||
import org.apache.activemq.store.AbstractMessageStore;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
|
@ -58,23 +52,20 @@ import org.apache.activemq.store.TopicMessageStore;
|
|||
import org.apache.activemq.store.TransactionStore;
|
||||
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaDestination;
|
||||
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
|
||||
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
|
||||
import org.apache.activemq.store.kahadb.data.KahaLocation;
|
||||
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
|
||||
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
|
||||
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
|
||||
import org.apache.activemq.usage.MemoryUsage;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.kahadb.journal.Journal;
|
||||
import org.apache.kahadb.journal.Location;
|
||||
import org.apache.kahadb.page.Transaction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
||||
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
|
||||
|
@ -85,7 +76,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
|
||||
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
|
||||
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
|
||||
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
|
||||
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);
|
||||
|
||||
protected ExecutorService queueExecutor;
|
||||
protected ExecutorService topicExecutor;
|
||||
|
@ -128,8 +119,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param concurrentStoreAndDispatch
|
||||
* the concurrentStoreAndDispatch to set
|
||||
* @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
|
||||
*/
|
||||
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
|
||||
this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
|
||||
|
@ -143,8 +133,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param concurrentStoreAndDispatch
|
||||
* the concurrentStoreAndDispatch to set
|
||||
* @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
|
||||
*/
|
||||
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
|
||||
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
|
||||
|
@ -153,16 +142,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public boolean isConcurrentStoreAndDispatchTransactions() {
|
||||
return this.concurrentStoreAndDispatchTransactions;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the maxAsyncJobs
|
||||
*/
|
||||
public int getMaxAsyncJobs() {
|
||||
return this.maxAsyncJobs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxAsyncJobs
|
||||
* the maxAsyncJobs to set
|
||||
* @param maxAsyncJobs the maxAsyncJobs to set
|
||||
*/
|
||||
public void setMaxAsyncJobs(int maxAsyncJobs) {
|
||||
this.maxAsyncJobs = maxAsyncJobs;
|
||||
|
@ -177,20 +166,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
|
||||
this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
|
||||
asyncQueueJobQueue, new ThreadFactory() {
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
|
||||
asyncTopicJobQueue, new ThreadFactory() {
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -287,14 +276,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
protected KahaDestination dest;
|
||||
private final int maxAsyncJobs;
|
||||
private final Semaphore localDestinationSemaphore;
|
||||
private final Journal journal;
|
||||
|
||||
double doneTasks, canceledTasks = 0;
|
||||
|
||||
public KahaDBMessageStore(ActiveMQDestination destination) {
|
||||
public KahaDBMessageStore(ActiveMQDestination destination) throws IOException {
|
||||
super(destination);
|
||||
this.dest = convert(destination);
|
||||
this.maxAsyncJobs = getMaxAsyncJobs();
|
||||
this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
|
||||
this.journal = getJournalManager().getJournal(destination);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -356,8 +347,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
command.setPrioritySupported(isPrioritizedMessages());
|
||||
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
|
||||
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
|
||||
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
|
||||
|
||||
store(journal, command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
|
||||
|
||||
}
|
||||
|
||||
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
|
||||
|
@ -368,13 +359,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
|
||||
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
|
||||
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
|
||||
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
|
||||
store(journal, command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
|
||||
}
|
||||
|
||||
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
|
||||
command.setDestination(dest);
|
||||
store(command, true, null, null);
|
||||
store(journal, command, true, null, null);
|
||||
}
|
||||
|
||||
public Message getMessage(MessageId identity) throws IOException {
|
||||
|
@ -396,14 +387,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
return sd.orderIndex.get(tx, sequence).location;
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
if (location == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return loadMessage(location);
|
||||
return loadMessage(journal, location);
|
||||
}
|
||||
|
||||
public int getMessageCount() throws IOException {
|
||||
|
@ -419,14 +410,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
int rc = 0;
|
||||
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
.hasNext(); ) {
|
||||
iterator.next();
|
||||
rc++;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
|
@ -446,7 +437,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
return sd.locationIndex.isEmpty(tx);
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -460,22 +451,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
sd.orderIndex.resetCursorPosition();
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
.hasNext(); ) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
|
||||
continue;
|
||||
}
|
||||
Message msg = loadMessage(entry.getValue().location);
|
||||
Message msg = loadMessage(journal, entry.getValue().location);
|
||||
listener.recoverMessage(msg);
|
||||
}
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
|
||||
indexLock.readLock().lock();
|
||||
try {
|
||||
|
@ -490,7 +481,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
|
||||
continue;
|
||||
}
|
||||
Message msg = loadMessage(entry.getValue().location);
|
||||
Message msg = loadMessage(journal, entry.getValue().location);
|
||||
listener.recoverMessage(msg);
|
||||
counter++;
|
||||
if (counter >= maxReturned) {
|
||||
|
@ -500,7 +491,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
sd.orderIndex.stoppedIterating();
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -511,11 +502,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public void execute(Transaction tx) throws Exception {
|
||||
StoredDestination sd = getExistingStoredDestination(dest, tx);
|
||||
if (sd != null) {
|
||||
sd.orderIndex.resetCursorPosition();}
|
||||
sd.orderIndex.resetCursorPosition();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to reset batching",e);
|
||||
LOG.error("Failed to reset batching", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -528,10 +520,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
// Hopefully one day the page file supports concurrent read
|
||||
// operations... but for now we must
|
||||
// externally synchronize...
|
||||
|
||||
|
||||
indexLock.writeLock().lock();
|
||||
try {
|
||||
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
Long location = sd.messageIdIndex.get(tx, key);
|
||||
|
@ -540,10 +532,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
|
||||
} finally {
|
||||
unlockAsyncJobQueue();
|
||||
}
|
||||
|
@ -553,15 +545,21 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
@Override
|
||||
public void setMemoryUsage(MemoryUsage memoeyUSage) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
super.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
super.stop();
|
||||
}
|
||||
|
||||
public Journal getJournal() {
|
||||
return this.journal;
|
||||
}
|
||||
|
||||
protected void lockAsyncJobQueue() {
|
||||
try {
|
||||
this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
|
||||
|
@ -590,6 +588,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
|
||||
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
|
||||
private final AtomicInteger subscriptionCount = new AtomicInteger();
|
||||
|
||||
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
|
||||
super(destination);
|
||||
this.subscriptionCount.set(getAllSubscriptions().length);
|
||||
|
@ -646,7 +645,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
if (ack != null && ack.isUnmatchedAck()) {
|
||||
command.setAck(UNMATCHED);
|
||||
}
|
||||
store(command, false, null, null);
|
||||
store(getJournal(), command, false, null, null);
|
||||
}
|
||||
|
||||
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
|
||||
|
@ -658,7 +657,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
command.setRetroactive(retroactive);
|
||||
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
|
||||
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
|
||||
store(command, isEnableJournalDiskSyncs() && true, null, null);
|
||||
store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
|
||||
this.subscriptionCount.incrementAndGet();
|
||||
}
|
||||
|
||||
|
@ -666,7 +665,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
|
||||
command.setDestination(dest);
|
||||
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
|
||||
store(command, isEnableJournalDiskSyncs() && true, null, null);
|
||||
store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
|
||||
this.subscriptionCount.decrementAndGet();
|
||||
}
|
||||
|
||||
|
@ -679,7 +678,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public void execute(Transaction tx) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
.hasNext(); ) {
|
||||
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
|
||||
SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
|
||||
.getValue().getSubscriptionInfo().newInput()));
|
||||
|
@ -688,7 +687,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
|
||||
|
@ -712,7 +711,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
.getSubscriptionInfo().newInput()));
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -732,7 +731,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
|
||||
int counter = 0;
|
||||
for (Iterator<Entry<Long, HashSet<String>>> iterator =
|
||||
sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
|
||||
sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext(); ) {
|
||||
Entry<Long, HashSet<String>> entry = iterator.next();
|
||||
if (entry.getValue().contains(subscriptionKey)) {
|
||||
counter++;
|
||||
|
@ -741,7 +740,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
return counter;
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -758,20 +757,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
sd.orderIndex.setBatch(tx, cursorPos);
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
.hasNext(); ) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
listener.recoverMessage(loadMessage(entry.getValue().location));
|
||||
listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location));
|
||||
}
|
||||
sd.orderIndex.resetCursorPosition();
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
|
||||
final MessageRecoveryListener listener) throws Exception {
|
||||
final MessageRecoveryListener listener) throws Exception {
|
||||
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
|
||||
indexLock.writeLock().lock();
|
||||
|
@ -796,9 +795,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
Entry<Long, MessageKeys> entry = null;
|
||||
int counter = 0;
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
|
||||
.hasNext();) {
|
||||
.hasNext(); ) {
|
||||
entry = iterator.next();
|
||||
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
|
||||
if (listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location))) {
|
||||
counter++;
|
||||
}
|
||||
if (counter >= maxReturned || listener.hasSpace() == false) {
|
||||
|
@ -812,7 +811,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -828,7 +827,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
sd.subscriptionCursors.remove(subscriptionKey);
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.writeLock().unlock();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -852,9 +851,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
/**
|
||||
* Cleanup method to remove any state associated with the given destination.
|
||||
* This method does not stop the message store (it might not be cached).
|
||||
*
|
||||
* @param destination
|
||||
* Destination to forget
|
||||
*
|
||||
* @param destination Destination to forget
|
||||
*/
|
||||
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||
}
|
||||
|
@ -862,9 +860,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
/**
|
||||
* Cleanup method to remove any state associated with the given destination
|
||||
* This method does not stop the message store (it might not be cached).
|
||||
*
|
||||
* @param destination
|
||||
* Destination to forget
|
||||
*
|
||||
* @param destination Destination to forget
|
||||
*/
|
||||
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||
}
|
||||
|
@ -881,7 +878,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
.hasNext(); ) {
|
||||
Entry<String, StoredDestination> entry = iterator.next();
|
||||
if (!isEmptyTopic(entry, tx)) {
|
||||
rc.add(convert(entry.getKey()));
|
||||
|
@ -902,7 +899,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
return isEmptyTopic;
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
return rc;
|
||||
|
@ -914,7 +911,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public long getLastMessageBrokerSequenceId() throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
public long getLastProducerSequenceId(ProducerId id) {
|
||||
indexLock.readLock().lock();
|
||||
try {
|
||||
|
@ -931,9 +928,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public void beginTransaction(ConnectionContext context) throws IOException {
|
||||
throw new IOException("Not yet implemented.");
|
||||
}
|
||||
|
||||
public void commitTransaction(ConnectionContext context) throws IOException {
|
||||
throw new IOException("Not yet implemented.");
|
||||
}
|
||||
|
||||
public void rollbackTransaction(ConnectionContext context) throws IOException {
|
||||
throw new IOException("Not yet implemented.");
|
||||
}
|
||||
|
@ -951,8 +950,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
Message loadMessage(Location location) throws IOException {
|
||||
KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
|
||||
Message loadMessage(Journal journal, Location location) throws IOException {
|
||||
KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(journal, location);
|
||||
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
|
||||
return msg;
|
||||
}
|
||||
|
@ -972,20 +971,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
KahaDestination rc = new KahaDestination();
|
||||
rc.setName(dest.getPhysicalName());
|
||||
switch (dest.getDestinationType()) {
|
||||
case ActiveMQDestination.QUEUE_TYPE:
|
||||
rc.setType(DestinationType.QUEUE);
|
||||
return rc;
|
||||
case ActiveMQDestination.TOPIC_TYPE:
|
||||
rc.setType(DestinationType.TOPIC);
|
||||
return rc;
|
||||
case ActiveMQDestination.TEMP_QUEUE_TYPE:
|
||||
rc.setType(DestinationType.TEMP_QUEUE);
|
||||
return rc;
|
||||
case ActiveMQDestination.TEMP_TOPIC_TYPE:
|
||||
rc.setType(DestinationType.TEMP_TOPIC);
|
||||
return rc;
|
||||
default:
|
||||
return null;
|
||||
case ActiveMQDestination.QUEUE_TYPE:
|
||||
rc.setType(DestinationType.QUEUE);
|
||||
return rc;
|
||||
case ActiveMQDestination.TOPIC_TYPE:
|
||||
rc.setType(DestinationType.TOPIC);
|
||||
return rc;
|
||||
case ActiveMQDestination.TEMP_QUEUE_TYPE:
|
||||
rc.setType(DestinationType.TEMP_QUEUE);
|
||||
return rc;
|
||||
case ActiveMQDestination.TEMP_TOPIC_TYPE:
|
||||
rc.setType(DestinationType.TEMP_TOPIC);
|
||||
return rc;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -998,16 +997,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
String name = dest.substring(p + 1);
|
||||
|
||||
switch (KahaDestination.DestinationType.valueOf(type)) {
|
||||
case QUEUE:
|
||||
return new ActiveMQQueue(name);
|
||||
case TOPIC:
|
||||
return new ActiveMQTopic(name);
|
||||
case TEMP_QUEUE:
|
||||
return new ActiveMQTempQueue(name);
|
||||
case TEMP_TOPIC:
|
||||
return new ActiveMQTempTopic(name);
|
||||
default:
|
||||
throw new IllegalArgumentException("Not in the valid destination format");
|
||||
case QUEUE:
|
||||
return new ActiveMQQueue(name);
|
||||
case TOPIC:
|
||||
return new ActiveMQTopic(name);
|
||||
case TEMP_QUEUE:
|
||||
return new ActiveMQTempQueue(name);
|
||||
case TEMP_TOPIC:
|
||||
return new ActiveMQTempTopic(name);
|
||||
default:
|
||||
throw new IllegalArgumentException("Not in the valid destination format");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1137,8 +1136,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
private final int subscriptionCount;
|
||||
private final List<String> subscriptionKeys = new ArrayList<String>(1);
|
||||
private final KahaDBTopicMessageStore topicStore;
|
||||
|
||||
public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
|
||||
int subscriptionCount) {
|
||||
int subscriptionCount) {
|
||||
super(store, context, message);
|
||||
this.topicStore = store;
|
||||
this.subscriptionCount = subscriptionCount;
|
||||
|
@ -1170,8 +1170,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
|
||||
/**
|
||||
* add a key
|
||||
*
|
||||
* @param key
|
||||
*
|
||||
* @return true if all acknowledgements received
|
||||
*/
|
||||
public boolean addSubscriptionKey(String key) {
|
||||
|
@ -1217,7 +1216,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
super.afterExecute(runnable, throwable);
|
||||
|
||||
if (runnable instanceof StoreTask) {
|
||||
((StoreTask)runnable).releaseLocks();
|
||||
((StoreTask) runnable).releaseLocks();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb;
|
|||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -26,7 +27,9 @@ import java.util.concurrent.CancellationException;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
|
@ -49,14 +52,13 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
|
|||
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.apache.kahadb.journal.Journal;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Provides a TransactionStore implementation that can create transaction aware
|
||||
* MessageStore objects from non transaction aware MessageStore objects.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class KahaDBTransactionStore implements TransactionStore {
|
||||
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
|
||||
|
@ -70,21 +72,23 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
|
||||
public class Tx {
|
||||
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
|
||||
|
||||
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
|
||||
private final HashSet<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>();
|
||||
|
||||
public void add(AddMessageCommand msg) {
|
||||
messages.add(msg);
|
||||
destinations.add(msg.getMessage().getDestination());
|
||||
}
|
||||
|
||||
public void add(RemoveMessageCommand ack) {
|
||||
acks.add(ack);
|
||||
destinations.add(ack.getMessageAck().getDestination());
|
||||
}
|
||||
|
||||
public Message[] getMessages() {
|
||||
Message rc[] = new Message[messages.size()];
|
||||
int count = 0;
|
||||
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
|
||||
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
|
||||
AddMessageCommand cmd = iter.next();
|
||||
rc[count++] = cmd.getMessage();
|
||||
}
|
||||
|
@ -94,7 +98,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
public MessageAck[] getAcks() {
|
||||
MessageAck rc[] = new MessageAck[acks.size()];
|
||||
int count = 0;
|
||||
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
|
||||
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
|
||||
RemoveMessageCommand cmd = iter.next();
|
||||
rc[count++] = cmd.getMessageAck();
|
||||
}
|
||||
|
@ -103,49 +107,56 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
|
||||
/**
|
||||
* @return true if something to commit
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<Future<Object>> commit() throws IOException {
|
||||
List<Future<Object>> results = new ArrayList<Future<Object>>();
|
||||
// Do all the message adds.
|
||||
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
|
||||
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
|
||||
AddMessageCommand cmd = iter.next();
|
||||
results.add(cmd.run());
|
||||
|
||||
}
|
||||
// And removes..
|
||||
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
|
||||
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
|
||||
RemoveMessageCommand cmd = iter.next();
|
||||
cmd.run();
|
||||
results.add(cmd.run());
|
||||
}
|
||||
|
||||
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
public abstract class AddMessageCommand {
|
||||
private final ConnectionContext ctx;
|
||||
|
||||
AddMessageCommand(ConnectionContext ctx) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
abstract Message getMessage();
|
||||
|
||||
Future<Object> run() throws IOException {
|
||||
return run(this.ctx);
|
||||
}
|
||||
|
||||
abstract Future<Object> run(ConnectionContext ctx) throws IOException;
|
||||
}
|
||||
|
||||
public abstract class RemoveMessageCommand {
|
||||
|
||||
private final ConnectionContext ctx;
|
||||
|
||||
RemoveMessageCommand(ConnectionContext ctx) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
abstract MessageAck getMessageAck();
|
||||
|
||||
Future<Object> run() throws IOException {
|
||||
return run(this.ctx);
|
||||
}
|
||||
|
||||
abstract Future<Object> run(ConnectionContext context) throws IOException;
|
||||
}
|
||||
|
||||
|
@ -197,8 +208,8 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
|
||||
@Override
|
||||
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
|
||||
MessageId messageId, MessageAck ack) throws IOException {
|
||||
KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
|
||||
MessageId messageId, MessageAck ack) throws IOException {
|
||||
KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore) getDelegate(), clientId,
|
||||
subscriptionName, messageId, ack);
|
||||
}
|
||||
|
||||
|
@ -206,13 +217,17 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
|
||||
*/
|
||||
public void prepare(TransactionId txid) throws IOException {
|
||||
inflightTransactions.remove(txid);
|
||||
KahaTransactionInfo info = getTransactionInfo(txid);
|
||||
theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
|
||||
Tx tx = inflightTransactions.get(txid);
|
||||
if (tx != null) {
|
||||
for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
|
||||
theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Tx getTx(Object txid) {
|
||||
|
@ -242,7 +257,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
|
||||
} catch (ExecutionException e) {
|
||||
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
|
||||
}catch(CancellationException e) {
|
||||
} catch (CancellationException e) {
|
||||
}
|
||||
if (!result.isCancelled()) {
|
||||
doneSomething = true;
|
||||
|
@ -253,9 +268,11 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
}
|
||||
if (doneSomething) {
|
||||
KahaTransactionInfo info = getTransactionInfo(txid);
|
||||
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
|
||||
for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
|
||||
theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, null, null);
|
||||
}
|
||||
}
|
||||
}else {
|
||||
} else {
|
||||
//The Tx will be null for failed over clients - lets run their post commits
|
||||
if (postCommit != null) {
|
||||
postCommit.run();
|
||||
|
@ -266,23 +283,26 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
KahaTransactionInfo info = getTransactionInfo(txid);
|
||||
// ensure message order w.r.t to cursor and store for setBatch()
|
||||
synchronized (this) {
|
||||
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
|
||||
for (Journal journal : theStore.getJournalManager().getJournals()) {
|
||||
theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
|
||||
}
|
||||
forgetRecoveredAcks(txid);
|
||||
}
|
||||
}
|
||||
}else {
|
||||
LOG.error("Null transaction passed on commit");
|
||||
} else {
|
||||
LOG.error("Null transaction passed on commit");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
|
||||
*/
|
||||
public void rollback(TransactionId txid) throws IOException {
|
||||
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
|
||||
KahaTransactionInfo info = getTransactionInfo(txid);
|
||||
theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
|
||||
for (Journal journal : theStore.getJournalManager().getJournals()) {
|
||||
theStore.store(journal, new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
|
||||
}
|
||||
forgetRecoveredAcks(txid);
|
||||
} else {
|
||||
inflightTransactions.remove(txid);
|
||||
|
@ -349,6 +369,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
public Message getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Object> run(ConnectionContext ctx) throws IOException {
|
||||
destination.addMessage(ctx, message);
|
||||
|
@ -376,6 +397,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
public Message getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Object> run(ConnectionContext ctx) throws IOException {
|
||||
return destination.asyncAddQueueMessage(ctx, message);
|
||||
|
@ -393,7 +415,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
throws IOException {
|
||||
|
||||
if (message.getTransactionId() != null) {
|
||||
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
|
||||
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
|
||||
destination.addMessage(context, message);
|
||||
return AbstractMessageStore.FUTURE;
|
||||
} else {
|
||||
|
@ -403,6 +425,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
public Message getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future run(ConnectionContext ctx) throws IOException {
|
||||
return destination.asyncAddTopicMessage(ctx, message);
|
||||
|
@ -424,7 +447,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
throws IOException {
|
||||
|
||||
if (ack.isInTransaction()) {
|
||||
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
|
||||
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
|
||||
destination.removeMessage(context, ack);
|
||||
} else {
|
||||
Tx tx = getTx(ack.getTransactionId());
|
||||
|
@ -450,7 +473,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
throws IOException {
|
||||
|
||||
if (ack.isInTransaction()) {
|
||||
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
|
||||
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
|
||||
destination.removeAsyncMessage(context, ack);
|
||||
} else {
|
||||
Tx tx = getTx(ack.getTransactionId());
|
||||
|
@ -476,7 +499,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
final MessageId messageId, final MessageAck ack) throws IOException {
|
||||
|
||||
if (ack.isInTransaction()) {
|
||||
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
|
||||
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
|
||||
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
|
||||
} else {
|
||||
Tx tx = getTx(ack.getTransactionId());
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -27,8 +27,6 @@ import org.apache.activemq.store.kahadb.KahaDBStore;
|
|||
import org.apache.activemq.util.DefaultIOExceptionHandler;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
|
@ -58,7 +56,7 @@ public class AMQ2736Test {
|
|||
|
||||
// test hack, close the journal to ensure no further journal updates when broker stops
|
||||
// mimic kill -9 in terms of no normal shutdown sequence
|
||||
store.getJournal().close();
|
||||
store.getJournalManager().close();
|
||||
try {
|
||||
store.close();
|
||||
} catch (Exception expectedLotsAsJournalBorked) {
|
||||
|
|
|
@ -16,11 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
|
@ -32,7 +28,6 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.RedeliveryPolicy;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
@ -42,6 +37,8 @@ import org.junit.After;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
public class AMQ2982Test {
|
||||
|
||||
|
@ -65,7 +62,7 @@ public class AMQ2982Test {
|
|||
// ensure save memory publishing, use the right lock
|
||||
indexLock.readLock().lock();
|
||||
try {
|
||||
return getJournal().getFileMap().size();
|
||||
return getJournalManager().getFileMap().size();
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
|
|
|
@ -16,10 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -34,7 +30,6 @@ import javax.jms.Message;
|
|||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.kahadb.KahaDBStore;
|
||||
|
@ -42,6 +37,7 @@ import org.junit.After;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class AMQ2983Test {
|
||||
|
||||
|
@ -67,7 +63,7 @@ public class AMQ2983Test {
|
|||
// ensure save memory publishing, use the right lock
|
||||
indexLock.readLock().lock();
|
||||
try {
|
||||
return getJournal().getFileMap().size();
|
||||
return getJournalManager().getFileMap().size();
|
||||
} finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
|
|
|
@ -31,9 +31,9 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
|
|||
protected long initialConsumerDelay = 0;
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
numberOfDestinations=1;
|
||||
numberOfDestinations=10;
|
||||
numberOfConsumers = 1;
|
||||
numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "20"), 20);
|
||||
numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "1"));
|
||||
sampleCount= Integer.parseInt(System.getProperty("SimpleDurableTopicTest.sampleCount", "1000"), 10);
|
||||
playloadSize = 1024;
|
||||
super.setUp();
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.usecases;
|
||||
|
||||
import java.util.Vector;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
@ -66,7 +67,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
public static Test suite() {
|
||||
return suite(DurableSubscriptionOfflineTest.class);
|
||||
}
|
||||
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
exceptions.clear();
|
||||
topic = (ActiveMQTopic) createDestination();
|
||||
|
@ -82,9 +83,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
private void createBroker() throws Exception {
|
||||
createBroker(true);
|
||||
}
|
||||
|
||||
|
||||
private void createBroker(boolean deleteAllMessages) throws Exception {
|
||||
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
|
||||
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")");
|
||||
broker.setBrokerName(getName(true));
|
||||
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
broker.getManagementContext().setCreateConnector(false);
|
||||
|
@ -96,14 +97,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
policyMap.setDefaultEntry(policy);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
}
|
||||
|
||||
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
|
||||
// ensure it kicks in during tests
|
||||
((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
|
||||
((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).setCleanupPeriod(2 * 1000);
|
||||
} else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
|
||||
// have lots of journal files
|
||||
((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
|
||||
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
|
||||
}
|
||||
broker.start();
|
||||
}
|
||||
|
@ -115,9 +116,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
this.addCombinationValues("usePrioritySupport",
|
||||
new Object[]{ Boolean.TRUE, Boolean.FALSE});
|
||||
new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testConsumeOnlyMatchedMessages() throws Exception {
|
||||
|
@ -162,110 +163,110 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals(sent, listener.count);
|
||||
}
|
||||
|
||||
public void testConsumeAllMatchedMessages() throws Exception {
|
||||
// create durable subscription
|
||||
Connection con = createConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
session.close();
|
||||
con.close();
|
||||
public void testConsumeAllMatchedMessages() throws Exception {
|
||||
// create durable subscription
|
||||
Connection con = createConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
int sent = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
sent++;
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", "true");
|
||||
producer.send(topic, message);
|
||||
}
|
||||
int sent = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
sent++;
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", "true");
|
||||
producer.send(topic, message);
|
||||
}
|
||||
|
||||
Thread.sleep(1 * 1000);
|
||||
Thread.sleep(1 * 1000);
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// consume messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
Listener listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
// consume messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
Listener listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
|
||||
Thread.sleep(3 * 1000);
|
||||
Thread.sleep(3 * 1000);
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
assertEquals(sent, listener.count);
|
||||
}
|
||||
|
||||
|
||||
public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
this.addCombinationValues("usePrioritySupport",
|
||||
new Object[]{ Boolean.TRUE, Boolean.FALSE});
|
||||
assertEquals(sent, listener.count);
|
||||
}
|
||||
|
||||
public void testVerifyAllConsumedAreAcked() throws Exception {
|
||||
// create durable subscription
|
||||
Connection con = createConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
this.addCombinationValues("usePrioritySupport",
|
||||
new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
int sent = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
sent++;
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", "true");
|
||||
producer.send(topic, message);
|
||||
}
|
||||
public void testVerifyAllConsumedAreAcked() throws Exception {
|
||||
// create durable subscription
|
||||
Connection con = createConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
Thread.sleep(1 * 1000);
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
int sent = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
sent++;
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", "true");
|
||||
producer.send(topic, message);
|
||||
}
|
||||
|
||||
// consume messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
Listener listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
Thread.sleep(1 * 1000);
|
||||
|
||||
Thread.sleep(3 * 1000);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
// consume messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
Listener listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
|
||||
LOG.info("Consumed: " + listener.count);
|
||||
assertEquals(sent, listener.count);
|
||||
Thread.sleep(3 * 1000);
|
||||
|
||||
// consume messages again, should not get any
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
Thread.sleep(3 * 1000);
|
||||
LOG.info("Consumed: " + listener.count);
|
||||
assertEquals(sent, listener.count);
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
// consume messages again, should not get any
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
|
||||
assertEquals(0, listener.count);
|
||||
}
|
||||
Thread.sleep(3 * 1000);
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
assertEquals(0, listener.count);
|
||||
}
|
||||
|
||||
public void testTwoOfflineSubscriptionCanConsume() throws Exception {
|
||||
// create durable subscription 1
|
||||
|
@ -323,9 +324,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
this.addCombinationValues("usePrioritySupport",
|
||||
new Object[]{ Boolean.TRUE, Boolean.FALSE});
|
||||
new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
|
||||
|
@ -474,14 +475,15 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
con.close();
|
||||
|
||||
assertEquals("offline consumer got all", sent, listener.count);
|
||||
}
|
||||
}
|
||||
|
||||
public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
|
||||
|
||||
public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
|
||||
// create offline subs 1
|
||||
Connection con = createConnection("offCli1");
|
||||
|
@ -629,9 +631,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
|
||||
public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
|
||||
// create offline subs 1
|
||||
Connection con = createConnection("offCli1");
|
||||
|
@ -672,7 +674,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
Thread.sleep(3 * 1000);
|
||||
broker.stop();
|
||||
createBroker(false /*deleteAllMessages*/);
|
||||
|
||||
|
||||
// send more messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -719,7 +721,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
public void initCombosForTestOfflineAfterRestart() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
public void testOfflineSubscriptionAfterRestart() throws Exception {
|
||||
|
@ -855,7 +857,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
int filtered = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
|
||||
boolean filter = (i % 2 == 0); //(int) (Math.random() * 2) >= 1;
|
||||
if (filter)
|
||||
filtered++;
|
||||
|
||||
|
@ -953,7 +955,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
sent = 0;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", i==1 ? "true" : "false");
|
||||
message.setStringProperty("filter", i == 1 ? "true" : "false");
|
||||
producer.send(topic, message);
|
||||
sent++;
|
||||
}
|
||||
|
@ -961,7 +963,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
Thread.sleep(1 * 1000);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
|
||||
LOG.info("cli1 again, should get 1 new ones");
|
||||
con = createConnection("cli1");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -1059,7 +1061,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
// use very small journal to get lots of files to cleanup
|
||||
public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
|
||||
this.addCombinationValues("journalMaxFileLength",
|
||||
new Object[]{new Integer(64*1024)});
|
||||
new Object[]{new Integer(64 * 1024)});
|
||||
}
|
||||
|
||||
// https://issues.apache.org/jira/browse/AMQ-3206
|
||||
|
@ -1081,7 +1083,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
final int toSend = 500;
|
||||
final String payload = new byte[40*1024].toString();
|
||||
final String payload = new byte[40 * 1024].toString();
|
||||
int sent = 0;
|
||||
for (int i = sent; i < toSend; i++) {
|
||||
Message message = session.createTextMessage(payload);
|
||||
|
@ -1108,7 +1110,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
consumer.setMessageListener(listener);
|
||||
assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.info("Want: " + toSend + ", current: " + listener.count);
|
||||
LOG.info("Want: " + toSend + ", current: " + listener.count);
|
||||
return listener.count == toSend;
|
||||
}
|
||||
}));
|
||||
|
@ -1118,7 +1120,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
destroyBroker();
|
||||
createBroker(false);
|
||||
KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
|
||||
assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
|
||||
assertEquals("only one journal file left after restart", 1, pa.getStore().getJournalManager().getFileMap().size());
|
||||
}
|
||||
|
||||
public static class Listener implements MessageListener {
|
||||
|
@ -1127,20 +1129,23 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
Listener() {
|
||||
}
|
||||
|
||||
Listener(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public void onMessage(Message message) {
|
||||
count++;
|
||||
if (id != null) {
|
||||
try {
|
||||
LOG.info(id + ", " + message.getJMSMessageID());
|
||||
} catch (Exception ignored) {}
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class FilterCheckListener extends Listener {
|
||||
public class FilterCheckListener extends Listener {
|
||||
|
||||
public void onMessage(Message message) {
|
||||
count++;
|
||||
|
@ -1150,13 +1155,11 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
if (b != null) {
|
||||
boolean c = message.getBooleanProperty("$c");
|
||||
assertTrue("", c);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
String d = message.getStringProperty("$d");
|
||||
assertTrue("", "D1".equals(d) || "D2".equals(d));
|
||||
}
|
||||
}
|
||||
catch (JMSException e) {
|
||||
} catch (JMSException e) {
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue