https://issues.apache.org/jira/browse/AMQ-2922 - rework, introduce new store 'mKahaDB' that contains multiple filtered kahadb persistence adapters, destinations match a store using destination wildcards in the same way as policy entries. Transactions that span multiple stores use a local xa variant to ensure consistency

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1170201 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-09-13 15:01:37 +00:00
parent 232d59a77d
commit 15953786d1
22 changed files with 2017 additions and 1113 deletions

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.filter;
import java.lang.IllegalStateException;
import javax.jms.*;
import org.apache.activemq.command.ActiveMQDestination;
/*
* allow match to any set of composite destinations, both queues and topics
*/
public class AnyDestination extends ActiveMQDestination {
public AnyDestination(ActiveMQDestination[] destinations) {
super(destinations);
// ensure we are small when it comes to comparison in DestinationMap
physicalName = "0";
}
@Override
protected String getQualifiedPrefix() {
return "Any://";
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.QUEUE_TYPE & ActiveMQDestination.TOPIC_TYPE;
}
@Override
public byte getDataStructureType() {
throw new IllegalStateException("not for marshalling");
}
@Override
public boolean isQueue() {
return true;
}
@Override
public boolean isTopic() {
return true;
}
}

View File

@ -1,105 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal;
public class DefaultJournalManager implements JournalManager {
private final Journal journal;
private final List<Journal> journals;
public DefaultJournalManager() {
this.journal = new Journal();
List<Journal> list = new ArrayList<Journal>(1);
list.add(this.journal);
this.journals = Collections.unmodifiableList(list);
}
public void start() throws IOException {
journal.start();
}
public void close() throws IOException {
journal.close();
}
public Journal getJournal(ActiveMQDestination destination) {
return journal;
}
public void setDirectory(File directory) {
journal.setDirectory(directory);
}
public void setMaxFileLength(int maxFileLength) {
journal.setMaxFileLength(maxFileLength);
}
public void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles) {
journal.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
}
public void setChecksum(boolean checksum) {
journal.setChecksum(checksum);
}
public void setWriteBatchSize(int batchSize) {
journal.setWriteBatchSize(batchSize);
}
public void setArchiveDataLogs(boolean archiveDataLogs) {
journal.setArchiveDataLogs(archiveDataLogs);
}
public void setStoreSize(AtomicLong storeSize) {
journal.setSizeAccumulator(storeSize);
}
public void setDirectoryArchive(File directoryArchive) {
journal.setDirectoryArchive(directoryArchive);
}
public void delete() throws IOException {
journal.delete();
}
public Map<Integer, DataFile> getFileMap() {
return journal.getFileMap();
}
public Collection<Journal> getJournals() {
return journals;
}
public Collection<Journal> getJournals(Set<ActiveMQDestination> set) {
return journals;
}
}

View File

@ -1,239 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal;
public class DestinationJournalManager implements JournalManager {
private static final String PREPEND = "JournalDest-";
private static final String QUEUE_PREPEND = PREPEND + "Queue-";
private static final String TOPIC_PREPEND = PREPEND + "Topic-";
private AtomicBoolean started = new AtomicBoolean();
private final Map<ActiveMQDestination, Journal> journalMap = new ConcurrentHashMap<ActiveMQDestination, Journal>();
private File directory = new File("KahaDB");
private File directoryArchive;
private int maxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private boolean checkForCorruptionOnStartup;
private boolean checksum = false;
private int writeBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
private boolean archiveDataLogs;
private AtomicLong storeSize = new AtomicLong(0);
public AtomicBoolean getStarted() {
return started;
}
public void setStarted(AtomicBoolean started) {
this.started = started;
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public File getDirectoryArchive() {
return directoryArchive;
}
public void setDirectoryArchive(File directoryArchive) {
this.directoryArchive = directoryArchive;
}
public int getMaxFileLength() {
return maxFileLength;
}
public void setMaxFileLength(int maxFileLength) {
this.maxFileLength = maxFileLength;
}
public boolean isCheckForCorruptionOnStartup() {
return checkForCorruptionOnStartup;
}
public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
}
public boolean isChecksum() {
return checksum;
}
public void setChecksum(boolean checksum) {
this.checksum = checksum;
}
public int getWriteBatchSize() {
return writeBatchSize;
}
public void setWriteBatchSize(int writeBatchSize) {
this.writeBatchSize = writeBatchSize;
}
public boolean isArchiveDataLogs() {
return archiveDataLogs;
}
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
}
public AtomicLong getStoreSize() {
return storeSize;
}
public void setStoreSize(AtomicLong storeSize) {
this.storeSize = storeSize;
}
public void start() throws IOException {
if (started.compareAndSet(false, true)) {
File[] files = getDirectory().listFiles(new FilenameFilter() {
public boolean accept(File file, String s) {
if (file.isDirectory() && s != null && s.startsWith(PREPEND)) {
return true;
}
return false;
}
});
if (files != null) {
for (File file : files) {
ActiveMQDestination destination;
if (file.getName().startsWith(TOPIC_PREPEND)) {
String destinationName = file.getName().substring(TOPIC_PREPEND.length());
destination = new ActiveMQTopic(destinationName);
} else {
String destinationName = file.getName().substring(QUEUE_PREPEND.length());
destination = new ActiveMQQueue(destinationName);
}
Journal journal = new Journal();
journal.setDirectory(file);
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
File archive = new File(getDirectoryArchive(), file.getName());
IOHelper.mkdirs(archive);
journal.setDirectoryArchive(archive);
}
configure(journal);
journalMap.put(destination, journal);
}
}
for (Journal journal : journalMap.values()) {
journal.start();
}
}
}
public void close() throws IOException {
started.set(false);
for (Journal journal : journalMap.values()) {
journal.close();
}
journalMap.clear();
}
public void delete() throws IOException {
for (Journal journal : journalMap.values()) {
journal.delete();
}
journalMap.clear();
}
public Journal getJournal(ActiveMQDestination destination) throws IOException {
Journal journal = journalMap.get(destination);
if (journal == null && !destination.isTemporary()) {
journal = new Journal();
String fileName;
if (destination.isTopic()) {
fileName = TOPIC_PREPEND + destination.getPhysicalName();
} else {
fileName = QUEUE_PREPEND + destination.getPhysicalName();
}
File file = new File(getDirectory(), fileName);
IOHelper.mkdirs(file);
journal.setDirectory(file);
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
File archive = new File(getDirectoryArchive(), fileName);
IOHelper.mkdirs(archive);
journal.setDirectoryArchive(archive);
}
configure(journal);
if (started.get()) {
journal.start();
}
return journal;
} else {
return journal;
}
}
public Map<Integer, DataFile> getFileMap() {
throw new RuntimeException("Not supported");
}
public Collection<Journal> getJournals() {
return journalMap.values();
}
public Collection<Journal> getJournals(Set<ActiveMQDestination> set) {
List<Journal> list = new ArrayList<Journal>();
for (ActiveMQDestination destination : set) {
Journal j = journalMap.get(destination);
if (j != null) {
list.add(j);
}
}
return list;
}
protected void configure(Journal journal) {
journal.setMaxFileLength(getMaxFileLength());
journal.setCheckForCorruptionOnStartup(isCheckForCorruptionOnStartup());
journal.setChecksum(isChecksum() || isCheckForCorruptionOnStartup());
journal.setWriteBatchSize(getWriteBatchSize());
journal.setArchiveDataLogs(isArchiveDataLogs());
journal.setSizeAccumulator(getStoreSize());
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import org.apache.activemq.filter.DestinationMapEntry;
/**
* @org.apache.xbean.XBean element="filteredKahaDB"
*
*/
public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
private KahaDBPersistenceAdapter persistenceAdapter;
public KahaDBPersistenceAdapter getPersistenceAdapter() {
return persistenceAdapter;
}
public void setPersistenceAdapter(KahaDBPersistenceAdapter persistenceAdapter) {
this.persistenceAdapter = persistenceAdapter;
}
@Override
public void afterPropertiesSet() throws Exception {
// ok to have no destination, we default it
}
}

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal;
public interface JournalManager {
void start() throws IOException;
void close() throws IOException;
Journal getJournal(ActiveMQDestination destination) throws IOException;
void setDirectory(File directory);
void setMaxFileLength(int maxFileLength);
void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles);
void setChecksum(boolean checksum);
void setWriteBatchSize(int batchSize);
void setArchiveDataLogs(boolean archiveDataLogs);
void setStoreSize(AtomicLong storeSize);
void setDirectoryArchive(File directoryArchive);
void delete() throws IOException;
Map<Integer, DataFile> getFileMap();
Collection<Journal> getJournals();
Collection<Journal> getJournals(Set<ActiveMQDestination> set);
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
@ -27,11 +26,18 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
/** /**
@ -46,6 +52,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
private final KahaDBStore letter = new KahaDBStore(); private final KahaDBStore letter = new KahaDBStore();
/** /**
* @param context
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
*/ */
public void beginTransaction(ConnectionContext context) throws IOException { public void beginTransaction(ConnectionContext context) throws IOException {
@ -53,6 +61,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param sync
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
*/ */
public void checkpoint(boolean sync) throws IOException { public void checkpoint(boolean sync) throws IOException {
@ -60,6 +70,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param context
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
*/ */
public void commitTransaction(ConnectionContext context) throws IOException { public void commitTransaction(ConnectionContext context) throws IOException {
@ -67,7 +79,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param destination
* @return MessageStore * @return MessageStore
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/ */
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
@ -75,7 +89,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param destination
* @return TopicMessageStore * @return TopicMessageStore
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/ */
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
@ -83,7 +99,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @return TrandactionStore * @return TransactionStore
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
*/ */
public TransactionStore createTransactionStore() throws IOException { public TransactionStore createTransactionStore() throws IOException {
@ -91,6 +108,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
*/ */
public void deleteAllMessages() throws IOException { public void deleteAllMessages() throws IOException {
@ -107,6 +125,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* @return lastMessageBrokerSequenceId * @return lastMessageBrokerSequenceId
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
*/ */
public long getLastMessageBrokerSequenceId() throws IOException { public long getLastMessageBrokerSequenceId() throws IOException {
@ -118,6 +137,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/ */
public void removeQueueMessageStore(ActiveMQQueue destination) { public void removeQueueMessageStore(ActiveMQQueue destination) {
@ -125,6 +145,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/ */
public void removeTopicMessageStore(ActiveMQTopic destination) { public void removeTopicMessageStore(ActiveMQTopic destination) {
@ -132,6 +153,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param context
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
*/ */
public void rollbackTransaction(ConnectionContext context) throws IOException { public void rollbackTransaction(ConnectionContext context) throws IOException {
@ -139,6 +162,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param brokerName
* @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
*/ */
public void setBrokerName(String brokerName) { public void setBrokerName(String brokerName) {
@ -146,6 +170,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param usageManager
* @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
*/ */
public void setUsageManager(SystemUsage usageManager) { public void setUsageManager(SystemUsage usageManager) {
@ -161,6 +186,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @throws Exception
* @see org.apache.activemq.Service#start() * @see org.apache.activemq.Service#start()
*/ */
public void start() throws Exception { public void start() throws Exception {
@ -168,6 +194,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @throws Exception
* @see org.apache.activemq.Service#stop() * @see org.apache.activemq.Service#stop()
*/ */
public void stop() throws Exception { public void stop() throws Exception {
@ -176,7 +203,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the journalMaxFileLength * Get the journalMaxFileLength
* *
* @return the journalMaxFileLength * @return the journalMaxFileLength
*/ */
public int getJournalMaxFileLength() { public int getJournalMaxFileLength() {
@ -186,6 +213,8 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
* be used * be used
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/ */
public void setJournalMaxFileLength(int journalMaxFileLength) { public void setJournalMaxFileLength(int journalMaxFileLength) {
this.letter.setJournalMaxFileLength(journalMaxFileLength); this.letter.setJournalMaxFileLength(journalMaxFileLength);
@ -197,7 +226,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
} }
public int getMaxFailoverProducersToTrack() { public int getMaxFailoverProducersToTrack() {
return this.letter.getMaxFailoverProducersToTrack(); return this.letter.getMaxFailoverProducersToTrack();
} }
@ -209,14 +238,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
} }
public int getFailoverProducersAuditDepth() { public int getFailoverProducersAuditDepth() {
return this.getFailoverProducersAuditDepth(); return this.getFailoverProducersAuditDepth();
} }
/** /**
* Get the checkpointInterval * Get the checkpointInterval
* *
* @return the checkpointInterval * @return the checkpointInterval
*/ */
public long getCheckpointInterval() { public long getCheckpointInterval() {
@ -225,8 +254,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the checkpointInterval * Set the checkpointInterval
* *
* @param checkpointInterval the checkpointInterval to set * @param checkpointInterval
* the checkpointInterval to set
*/ */
public void setCheckpointInterval(long checkpointInterval) { public void setCheckpointInterval(long checkpointInterval) {
this.letter.setCheckpointInterval(checkpointInterval); this.letter.setCheckpointInterval(checkpointInterval);
@ -234,7 +264,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the cleanupInterval * Get the cleanupInterval
* *
* @return the cleanupInterval * @return the cleanupInterval
*/ */
public long getCleanupInterval() { public long getCleanupInterval() {
@ -243,8 +273,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the cleanupInterval * Set the cleanupInterval
* *
* @param cleanupInterval the cleanupInterval to set * @param cleanupInterval
* the cleanupInterval to set
*/ */
public void setCleanupInterval(long cleanupInterval) { public void setCleanupInterval(long cleanupInterval) {
this.letter.setCleanupInterval(cleanupInterval); this.letter.setCleanupInterval(cleanupInterval);
@ -252,7 +283,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the indexWriteBatchSize * Get the indexWriteBatchSize
* *
* @return the indexWriteBatchSize * @return the indexWriteBatchSize
*/ */
public int getIndexWriteBatchSize() { public int getIndexWriteBatchSize() {
@ -262,8 +293,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the indexWriteBatchSize * Set the indexWriteBatchSize
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
* @param indexWriteBatchSize the indexWriteBatchSize to set * @param indexWriteBatchSize
* the indexWriteBatchSize to set
*/ */
public void setIndexWriteBatchSize(int indexWriteBatchSize) { public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.letter.setIndexWriteBatchSize(indexWriteBatchSize); this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
@ -271,7 +303,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the journalMaxWriteBatchSize * Get the journalMaxWriteBatchSize
* *
* @return the journalMaxWriteBatchSize * @return the journalMaxWriteBatchSize
*/ */
public int getJournalMaxWriteBatchSize() { public int getJournalMaxWriteBatchSize() {
@ -280,9 +312,10 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the journalMaxWriteBatchSize * Set the journalMaxWriteBatchSize
* * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
* @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set * @param journalMaxWriteBatchSize
* the journalMaxWriteBatchSize to set
*/ */
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
@ -290,7 +323,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the enableIndexWriteAsync * Get the enableIndexWriteAsync
* *
* @return the enableIndexWriteAsync * @return the enableIndexWriteAsync
*/ */
public boolean isEnableIndexWriteAsync() { public boolean isEnableIndexWriteAsync() {
@ -299,8 +332,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the enableIndexWriteAsync * Set the enableIndexWriteAsync
* *
* @param enableIndexWriteAsync the enableIndexWriteAsync to set * @param enableIndexWriteAsync
* the enableIndexWriteAsync to set
*/ */
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
@ -308,7 +342,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the directory * Get the directory
* *
* @return the directory * @return the directory
*/ */
public File getDirectory() { public File getDirectory() {
@ -316,6 +350,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
} }
/** /**
* @param dir
* @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
*/ */
public void setDirectory(File dir) { public void setDirectory(File dir) {
@ -324,7 +359,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the enableJournalDiskSyncs * Get the enableJournalDiskSyncs
* *
* @return the enableJournalDiskSyncs * @return the enableJournalDiskSyncs
*/ */
public boolean isEnableJournalDiskSyncs() { public boolean isEnableJournalDiskSyncs() {
@ -333,8 +368,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the enableJournalDiskSyncs * Set the enableJournalDiskSyncs
* *
* @param enableJournalDiskSyncs the enableJournalDiskSyncs to set * @param enableJournalDiskSyncs
* the enableJournalDiskSyncs to set
*/ */
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
@ -342,7 +378,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the indexCacheSize * Get the indexCacheSize
* *
* @return the indexCacheSize * @return the indexCacheSize
*/ */
public int getIndexCacheSize() { public int getIndexCacheSize() {
@ -352,8 +388,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the indexCacheSize * Set the indexCacheSize
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
* @param indexCacheSize the indexCacheSize to set * @param indexCacheSize
* the indexCacheSize to set
*/ */
public void setIndexCacheSize(int indexCacheSize) { public void setIndexCacheSize(int indexCacheSize) {
this.letter.setIndexCacheSize(indexCacheSize); this.letter.setIndexCacheSize(indexCacheSize);
@ -361,7 +398,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Get the ignoreMissingJournalfiles * Get the ignoreMissingJournalfiles
* *
* @return the ignoreMissingJournalfiles * @return the ignoreMissingJournalfiles
*/ */
public boolean isIgnoreMissingJournalfiles() { public boolean isIgnoreMissingJournalfiles() {
@ -370,8 +407,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/** /**
* Set the ignoreMissingJournalfiles * Set the ignoreMissingJournalfiles
* *
* @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set * @param ignoreMissingJournalfiles
* the ignoreMissingJournalfiles to set
*/ */
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
@ -432,14 +470,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
public int getMaxAsyncJobs() { public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs(); return letter.getMaxAsyncJobs();
} }
/** /**
* @param maxAsyncJobs the maxAsyncJobs to set * @param maxAsyncJobs
* the maxAsyncJobs to set
*/ */
public void setMaxAsyncJobs(int maxAsyncJobs) { public void setMaxAsyncJobs(int maxAsyncJobs) {
letter.setMaxAsyncJobs(maxAsyncJobs); letter.setMaxAsyncJobs(maxAsyncJobs);
} }
/** /**
* @return the databaseLockedWaitDelay * @return the databaseLockedWaitDelay
*/ */
@ -451,7 +489,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
* @param databaseLockedWaitDelay the databaseLockedWaitDelay to set * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
*/ */
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay); letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
} }
public boolean getForceRecoverIndex() { public boolean getForceRecoverIndex() {
@ -462,19 +500,33 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
letter.setForceRecoverIndex(forceRecoverIndex); letter.setForceRecoverIndex(forceRecoverIndex);
} }
public boolean isJournalPerDestination() {
return letter.isJournalPerDestination();
}
public void setJournalPerDestination(boolean journalPerDestination) {
letter.setJournalPerDestination(journalPerDestination);
}
// for testing
public KahaDBStore getStore() { public KahaDBStore getStore() {
return letter; return letter;
} }
public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
if (txid == null) {
return null;
}
KahaTransactionInfo rc = new KahaTransactionInfo();
if (txid.isLocalTransaction()) {
LocalTransactionId t = (LocalTransactionId) txid;
KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
kahaTxId.setConnectionId(t.getConnectionId().getValue());
kahaTxId.setTransacitonId(t.getValue());
rc.setLocalTransacitonId(kahaTxId);
} else {
XATransactionId t = (XATransactionId) txid;
KahaXATransactionId kahaTxId = new KahaXATransactionId();
kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
kahaTxId.setFormatId(t.getFormatId());
rc.setXaTransacitonId(kahaTxId);
}
return rc;
}
@Override @Override
public String toString() { public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";

View File

@ -26,22 +26,23 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.Map.Entry;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.AbstractMessageStore;
@ -52,20 +53,20 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
@ -76,7 +77,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10); PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
protected ExecutorService queueExecutor; protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor; protected ExecutorService topicExecutor;
@ -95,9 +96,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
private boolean concurrentStoreAndDispatchTransactions = false; private boolean concurrentStoreAndDispatchTransactions = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS; private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore; private final KahaDBTransactionStore transactionStore;
private TransactionIdTransformer transactionIdTransformer;
public KahaDBStore() { public KahaDBStore() {
this.transactionStore = new KahaDBTransactionStore(this); this.transactionStore = new KahaDBTransactionStore(this);
this.transactionIdTransformer = new TransactionIdTransformer() {
@Override
public KahaTransactionInfo transform(TransactionId txid) {
return TransactionIdConversion.convert(txid);
}
};
} }
@Override @Override
@ -124,7 +132,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
/** /**
* @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set * @param concurrentStoreAndDispatch
* the concurrentStoreAndDispatch to set
*/ */
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
@ -138,7 +147,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
/** /**
* @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set * @param concurrentStoreAndDispatch
* the concurrentStoreAndDispatch to set
*/ */
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
@ -147,16 +157,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public boolean isConcurrentStoreAndDispatchTransactions() { public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions; return this.concurrentStoreAndDispatchTransactions;
} }
/** /**
* @return the maxAsyncJobs * @return the maxAsyncJobs
*/ */
public int getMaxAsyncJobs() { public int getMaxAsyncJobs() {
return this.maxAsyncJobs; return this.maxAsyncJobs;
} }
/** /**
* @param maxAsyncJobs the maxAsyncJobs to set * @param maxAsyncJobs
* the maxAsyncJobs to set
*/ */
public void setMaxAsyncJobs(int maxAsyncJobs) { public void setMaxAsyncJobs(int maxAsyncJobs) {
this.maxAsyncJobs = maxAsyncJobs; this.maxAsyncJobs = maxAsyncJobs;
@ -171,20 +181,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncQueueJobQueue, new ThreadFactory() { asyncQueueJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
thread.setDaemon(true); thread.setDaemon(true);
return thread; return thread;
} }
}); });
this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncTopicJobQueue, new ThreadFactory() { asyncTopicJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
thread.setDaemon(true); thread.setDaemon(true);
return thread; return thread;
} }
}); });
} }
@Override @Override
@ -281,16 +291,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
protected KahaDestination dest; protected KahaDestination dest;
private final int maxAsyncJobs; private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore; private final Semaphore localDestinationSemaphore;
private final Journal journal;
double doneTasks, canceledTasks = 0; double doneTasks, canceledTasks = 0;
public KahaDBMessageStore(ActiveMQDestination destination) throws IOException { public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination); super(destination);
this.dest = convert(destination); this.dest = convert(destination);
this.maxAsyncJobs = getMaxAsyncJobs(); this.maxAsyncJobs = getMaxAsyncJobs();
this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
this.journal = getJournalManager().getJournal(destination);
} }
@Override @Override
@ -347,30 +355,30 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
KahaAddMessageCommand command = new KahaAddMessageCommand(); KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest); command.setDestination(dest);
command.setMessageId(message.getMessageId().toString()); command.setMessageId(message.getMessageId().toString());
command.setTransactionInfo(createTransactionInfo(message.getTransactionId())); command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
command.setPriority(message.getPriority()); command.setPriority(message.getPriority());
command.setPrioritySupported(isPrioritizedMessages()); command.setPrioritySupported(isPrioritizedMessages());
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(journal, command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
} }
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest); command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString()); command.setMessageId(ack.getLastMessageId().toString());
command.setTransactionInfo(createTransactionInfo(ack.getTransactionId())); command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(journal, command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
} }
public void removeAllMessages(ConnectionContext context) throws IOException { public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest); command.setDestination(dest);
store(journal, command, true, null, null); store(command, true, null, null);
} }
public Message getMessage(MessageId identity) throws IOException { public Message getMessage(MessageId identity) throws IOException {
@ -392,14 +400,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return sd.orderIndex.get(tx, sequence).location; return sd.orderIndex.get(tx, sequence).location;
} }
}); });
} finally { }finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }
if (location == null) { if (location == null) {
return null; return null;
} }
return loadMessage(journal, location); return loadMessage(location);
} }
public int getMessageCount() throws IOException { public int getMessageCount() throws IOException {
@ -415,14 +423,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
StoredDestination sd = getStoredDestination(dest, tx); StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0; int rc = 0;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
.hasNext(); ) { .hasNext();) {
iterator.next(); iterator.next();
rc++; rc++;
} }
return rc; return rc;
} }
}); });
} finally { }finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }
} finally { } finally {
@ -442,7 +450,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return sd.locationIndex.isEmpty(tx); return sd.locationIndex.isEmpty(tx);
} }
}); });
} finally { }finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }
} }
@ -461,17 +469,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
if (ackedAndPrepared.contains(entry.getValue().messageId)) { if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue; continue;
} }
Message msg = loadMessage(journal, entry.getValue().location); Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg); listener.recoverMessage(msg);
} }
} }
}); });
} finally { }finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} }
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock(); indexLock.readLock().lock();
try { try {
@ -486,7 +494,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
if (ackedAndPrepared.contains(entry.getValue().messageId)) { if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue; continue;
} }
Message msg = loadMessage(journal, entry.getValue().location); Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg); listener.recoverMessage(msg);
counter++; counter++;
if (counter >= maxReturned) { if (counter >= maxReturned) {
@ -496,23 +504,24 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.orderIndex.stoppedIterating(); sd.orderIndex.stoppedIterating();
} }
}); });
} finally { }finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }
} }
public void resetBatching() { public void resetBatching() {
try { if (pageFile.isLoaded()) {
pageFile.tx().execute(new Transaction.Closure<Exception>() { try {
public void execute(Transaction tx) throws Exception { pageFile.tx().execute(new Transaction.Closure<Exception>() {
StoredDestination sd = getExistingStoredDestination(dest, tx); public void execute(Transaction tx) throws Exception {
if (sd != null) { StoredDestination sd = getExistingStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition(); if (sd != null) {
} sd.orderIndex.resetCursorPosition();}
} }
}); });
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to reset batching", e); LOG.error("Failed to reset batching",e);
}
} }
} }
@ -525,10 +534,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// Hopefully one day the page file supports concurrent read // Hopefully one day the page file supports concurrent read
// operations... but for now we must // operations... but for now we must
// externally synchronize... // externally synchronize...
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try { try {
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx); StoredDestination sd = getStoredDestination(dest, tx);
Long location = sd.messageIdIndex.get(tx, key); Long location = sd.messageIdIndex.get(tx, key);
@ -537,10 +546,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
} }
}); });
} finally { }finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} finally { } finally {
unlockAsyncJobQueue(); unlockAsyncJobQueue();
} }
@ -550,21 +559,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override @Override
public void setMemoryUsage(MemoryUsage memoeyUSage) { public void setMemoryUsage(MemoryUsage memoeyUSage) {
} }
@Override @Override
public void start() throws Exception { public void start() throws Exception {
super.start(); super.start();
} }
@Override @Override
public void stop() throws Exception { public void stop() throws Exception {
super.stop(); super.stop();
} }
public Journal getJournal() {
return this.journal;
}
protected void lockAsyncJobQueue() { protected void lockAsyncJobQueue() {
try { try {
this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
@ -593,7 +596,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
private final AtomicInteger subscriptionCount = new AtomicInteger(); private final AtomicInteger subscriptionCount = new AtomicInteger();
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination); super(destination);
this.subscriptionCount.set(getAllSubscriptions().length); this.subscriptionCount.set(getAllSubscriptions().length);
@ -646,11 +648,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setDestination(dest); command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey); command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString()); command.setMessageId(messageId.toString());
command.setTransactionInfo(createTransactionInfo(ack.getTransactionId())); command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
if (ack != null && ack.isUnmatchedAck()) { if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED); command.setAck(UNMATCHED);
} }
store(getJournal(), command, false, null, null); store(command, false, null, null);
} }
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@ -662,7 +664,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setRetroactive(retroactive); command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null); store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.incrementAndGet(); this.subscriptionCount.incrementAndGet();
} }
@ -670,7 +672,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
KahaSubscriptionCommand command = new KahaSubscriptionCommand(); KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest); command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null); store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet(); this.subscriptionCount.decrementAndGet();
} }
@ -683,7 +685,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx); StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
.hasNext(); ) { .hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next(); Entry<String, KahaSubscriptionCommand> entry = iterator.next();
SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
.getValue().getSubscriptionInfo().newInput())); .getValue().getSubscriptionInfo().newInput()));
@ -692,7 +694,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
} }
}); });
} finally { }finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }
@ -716,7 +718,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
.getSubscriptionInfo().newInput())); .getSubscriptionInfo().newInput()));
} }
}); });
} finally { }finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }
} }
@ -736,7 +738,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
int counter = 0; int counter = 0;
for (Iterator<Entry<Long, HashSet<String>>> iterator = for (Iterator<Entry<Long, HashSet<String>>> iterator =
sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext(); ) { sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
Entry<Long, HashSet<String>> entry = iterator.next(); Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().contains(subscriptionKey)) { if (entry.getValue().contains(subscriptionKey)) {
counter++; counter++;
@ -745,7 +747,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return counter; return counter;
} }
}); });
} finally { }finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} }
@ -762,20 +764,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos); sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext(); ) { .hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next(); Entry<Long, MessageKeys> entry = iterator.next();
listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location)); listener.recoverMessage(loadMessage(entry.getValue().location));
} }
sd.orderIndex.resetCursorPosition(); sd.orderIndex.resetCursorPosition();
} }
}); });
} finally { }finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} }
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception { final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock(); indexLock.writeLock().lock();
@ -800,9 +802,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
Entry<Long, MessageKeys> entry = null; Entry<Long, MessageKeys> entry = null;
int counter = 0; int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext(); ) { .hasNext();) {
entry = iterator.next(); entry = iterator.next();
if (listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location))) { if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
counter++; counter++;
} }
if (counter >= maxReturned || listener.hasSpace() == false) { if (counter >= maxReturned || listener.hasSpace() == false) {
@ -816,7 +818,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
} }
}); });
} finally { }finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} }
@ -832,7 +834,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.subscriptionCursors.remove(subscriptionKey); sd.subscriptionCursors.remove(subscriptionKey);
} }
}); });
} finally { }finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} catch (IOException e) { } catch (IOException e) {
@ -856,8 +858,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
/** /**
* Cleanup method to remove any state associated with the given destination. * Cleanup method to remove any state associated with the given destination.
* This method does not stop the message store (it might not be cached). * This method does not stop the message store (it might not be cached).
* *
* @param destination Destination to forget * @param destination
* Destination to forget
*/ */
public void removeQueueMessageStore(ActiveMQQueue destination) { public void removeQueueMessageStore(ActiveMQQueue destination) {
} }
@ -865,8 +868,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
/** /**
* Cleanup method to remove any state associated with the given destination * Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached). * This method does not stop the message store (it might not be cached).
* *
* @param destination Destination to forget * @param destination
* Destination to forget
*/ */
public void removeTopicMessageStore(ActiveMQTopic destination) { public void removeTopicMessageStore(ActiveMQTopic destination) {
} }
@ -883,7 +887,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
.hasNext(); ) { .hasNext();) {
Entry<String, StoredDestination> entry = iterator.next(); Entry<String, StoredDestination> entry = iterator.next();
if (!isEmptyTopic(entry, tx)) { if (!isEmptyTopic(entry, tx)) {
rc.add(convert(entry.getKey())); rc.add(convert(entry.getKey()));
@ -904,7 +908,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return isEmptyTopic; return isEmptyTopic;
} }
}); });
} finally { }finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }
return rc; return rc;
@ -916,7 +920,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public long getLastMessageBrokerSequenceId() throws IOException { public long getLastMessageBrokerSequenceId() throws IOException {
return 0; return 0;
} }
public long getLastProducerSequenceId(ProducerId id) { public long getLastProducerSequenceId(ProducerId id) {
indexLock.readLock().lock(); indexLock.readLock().lock();
try { try {
@ -933,11 +937,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void beginTransaction(ConnectionContext context) throws IOException { public void beginTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented."); throw new IOException("Not yet implemented.");
} }
public void commitTransaction(ConnectionContext context) throws IOException { public void commitTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented."); throw new IOException("Not yet implemented.");
} }
public void rollbackTransaction(ConnectionContext context) throws IOException { public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented."); throw new IOException("Not yet implemented.");
} }
@ -955,8 +957,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
* @return * @return
* @throws IOException * @throws IOException
*/ */
Message loadMessage(Journal journal, Location location) throws IOException { Message loadMessage(Location location) throws IOException {
KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(journal, location); KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
return msg; return msg;
} }
@ -976,20 +978,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
KahaDestination rc = new KahaDestination(); KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName()); rc.setName(dest.getPhysicalName());
switch (dest.getDestinationType()) { switch (dest.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
rc.setType(DestinationType.QUEUE); rc.setType(DestinationType.QUEUE);
return rc; return rc;
case ActiveMQDestination.TOPIC_TYPE: case ActiveMQDestination.TOPIC_TYPE:
rc.setType(DestinationType.TOPIC); rc.setType(DestinationType.TOPIC);
return rc; return rc;
case ActiveMQDestination.TEMP_QUEUE_TYPE: case ActiveMQDestination.TEMP_QUEUE_TYPE:
rc.setType(DestinationType.TEMP_QUEUE); rc.setType(DestinationType.TEMP_QUEUE);
return rc; return rc;
case ActiveMQDestination.TEMP_TOPIC_TYPE: case ActiveMQDestination.TEMP_TOPIC_TYPE:
rc.setType(DestinationType.TEMP_TOPIC); rc.setType(DestinationType.TEMP_TOPIC);
return rc; return rc;
default: default:
return null; return null;
} }
} }
@ -1002,19 +1004,27 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
String name = dest.substring(p + 1); String name = dest.substring(p + 1);
switch (KahaDestination.DestinationType.valueOf(type)) { switch (KahaDestination.DestinationType.valueOf(type)) {
case QUEUE: case QUEUE:
return new ActiveMQQueue(name); return new ActiveMQQueue(name);
case TOPIC: case TOPIC:
return new ActiveMQTopic(name); return new ActiveMQTopic(name);
case TEMP_QUEUE: case TEMP_QUEUE:
return new ActiveMQTempQueue(name); return new ActiveMQTempQueue(name);
case TEMP_TOPIC: case TEMP_TOPIC:
return new ActiveMQTempTopic(name); return new ActiveMQTempTopic(name);
default: default:
throw new IllegalArgumentException("Not in the valid destination format"); throw new IllegalArgumentException("Not in the valid destination format");
} }
} }
public TransactionIdTransformer getTransactionIdTransformer() {
return transactionIdTransformer;
}
public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
this.transactionIdTransformer = transactionIdTransformer;
}
static class AsyncJobKey { static class AsyncJobKey {
MessageId id; MessageId id;
ActiveMQDestination destination; ActiveMQDestination destination;
@ -1141,9 +1151,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
private final int subscriptionCount; private final int subscriptionCount;
private final List<String> subscriptionKeys = new ArrayList<String>(1); private final List<String> subscriptionKeys = new ArrayList<String>(1);
private final KahaDBTopicMessageStore topicStore; private final KahaDBTopicMessageStore topicStore;
public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
int subscriptionCount) { int subscriptionCount) {
super(store, context, message); super(store, context, message);
this.topicStore = store; this.topicStore = store;
this.subscriptionCount = subscriptionCount; this.subscriptionCount = subscriptionCount;
@ -1175,7 +1184,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
/** /**
* add a key * add a key
* *
* @param key
* @return true if all acknowledgements received * @return true if all acknowledgements received
*/ */
public boolean addSubscriptionKey(String key) { public boolean addSubscriptionKey(String key) {
@ -1221,7 +1231,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
super.afterExecute(runnable, throwable); super.afterExecute(runnable, throwable);
if (runnable instanceof StoreTask) { if (runnable instanceof StoreTask) {
((StoreTask) runnable).releaseLocks(); ((StoreTask)runnable).releaseLocks();
} }
} }

View File

@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -27,9 +26,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
@ -52,13 +49,14 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Journal;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Provides a TransactionStore implementation that can create transaction aware * Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects. * MessageStore objects from non transaction aware MessageStore objects.
*
*
*/ */
public class KahaDBTransactionStore implements TransactionStore { public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
@ -72,23 +70,21 @@ public class KahaDBTransactionStore implements TransactionStore {
public class Tx { public class Tx {
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
private final HashSet<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>();
public void add(AddMessageCommand msg) { public void add(AddMessageCommand msg) {
messages.add(msg); messages.add(msg);
destinations.add(msg.getMessage().getDestination());
} }
public void add(RemoveMessageCommand ack) { public void add(RemoveMessageCommand ack) {
acks.add(ack); acks.add(ack);
destinations.add(ack.getMessageAck().getDestination());
} }
public Message[] getMessages() { public Message[] getMessages() {
Message rc[] = new Message[messages.size()]; Message rc[] = new Message[messages.size()];
int count = 0; int count = 0;
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) { for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next(); AddMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessage(); rc[count++] = cmd.getMessage();
} }
@ -98,7 +94,7 @@ public class KahaDBTransactionStore implements TransactionStore {
public MessageAck[] getAcks() { public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()]; MessageAck rc[] = new MessageAck[acks.size()];
int count = 0; int count = 0;
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) { for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next(); RemoveMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessageAck(); rc[count++] = cmd.getMessageAck();
} }
@ -107,56 +103,49 @@ public class KahaDBTransactionStore implements TransactionStore {
/** /**
* @return true if something to commit * @return true if something to commit
* @throws IOException
*/ */
public List<Future<Object>> commit() throws IOException { public List<Future<Object>> commit() throws IOException {
List<Future<Object>> results = new ArrayList<Future<Object>>(); List<Future<Object>> results = new ArrayList<Future<Object>>();
// Do all the message adds. // Do all the message adds.
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) { for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next(); AddMessageCommand cmd = iter.next();
results.add(cmd.run()); results.add(cmd.run());
} }
// And removes.. // And removes..
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) { for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next(); RemoveMessageCommand cmd = iter.next();
cmd.run(); cmd.run();
results.add(cmd.run()); results.add(cmd.run());
} }
return results; return results;
} }
} }
public abstract class AddMessageCommand { public abstract class AddMessageCommand {
private final ConnectionContext ctx; private final ConnectionContext ctx;
AddMessageCommand(ConnectionContext ctx) { AddMessageCommand(ConnectionContext ctx) {
this.ctx = ctx; this.ctx = ctx;
} }
abstract Message getMessage(); abstract Message getMessage();
Future<Object> run() throws IOException { Future<Object> run() throws IOException {
return run(this.ctx); return run(this.ctx);
} }
abstract Future<Object> run(ConnectionContext ctx) throws IOException; abstract Future<Object> run(ConnectionContext ctx) throws IOException;
} }
public abstract class RemoveMessageCommand { public abstract class RemoveMessageCommand {
private final ConnectionContext ctx; private final ConnectionContext ctx;
RemoveMessageCommand(ConnectionContext ctx) { RemoveMessageCommand(ConnectionContext ctx) {
this.ctx = ctx; this.ctx = ctx;
} }
abstract MessageAck getMessageAck(); abstract MessageAck getMessageAck();
Future<Object> run() throws IOException { Future<Object> run() throws IOException {
return run(this.ctx); return run(this.ctx);
} }
abstract Future<Object> run(ConnectionContext context) throws IOException; abstract Future<Object> run(ConnectionContext context) throws IOException;
} }
@ -208,8 +197,8 @@ public class KahaDBTransactionStore implements TransactionStore {
@Override @Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException { MessageId messageId, MessageAck ack) throws IOException {
KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore) getDelegate(), clientId, KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
subscriptionName, messageId, ack); subscriptionName, messageId, ack);
} }
@ -217,20 +206,17 @@ public class KahaDBTransactionStore implements TransactionStore {
} }
/** /**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/ */
public void prepare(TransactionId txid) throws IOException { public void prepare(TransactionId txid) throws IOException {
KahaTransactionInfo info = getTransactionInfo(txid); KahaTransactionInfo info = getTransactionInfo(txid);
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
for (Journal journal : theStore.getJournalManager().getJournals()) { theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
}
} else { } else {
Tx tx = inflightTransactions.remove(txid); Tx tx = inflightTransactions.remove(txid);
if (tx != null) { if (tx != null) {
for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) { theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
}
} }
} }
} }
@ -262,7 +248,7 @@ public class KahaDBTransactionStore implements TransactionStore {
theStore.brokerService.handleIOException(new IOException(e.getMessage())); theStore.brokerService.handleIOException(new IOException(e.getMessage()));
} catch (ExecutionException e) { } catch (ExecutionException e) {
theStore.brokerService.handleIOException(new IOException(e.getMessage())); theStore.brokerService.handleIOException(new IOException(e.getMessage()));
} catch (CancellationException e) { }catch(CancellationException e) {
} }
if (!result.isCancelled()) { if (!result.isCancelled()) {
doneSomething = true; doneSomething = true;
@ -273,11 +259,9 @@ public class KahaDBTransactionStore implements TransactionStore {
} }
if (doneSomething) { if (doneSomething) {
KahaTransactionInfo info = getTransactionInfo(txid); KahaTransactionInfo info = getTransactionInfo(txid);
for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) { theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, null, null);
}
} }
} else { }else {
//The Tx will be null for failed over clients - lets run their post commits //The Tx will be null for failed over clients - lets run their post commits
if (postCommit != null) { if (postCommit != null) {
postCommit.run(); postCommit.run();
@ -286,25 +270,22 @@ public class KahaDBTransactionStore implements TransactionStore {
} else { } else {
KahaTransactionInfo info = getTransactionInfo(txid); KahaTransactionInfo info = getTransactionInfo(txid);
for (Journal journal : theStore.getJournalManager().getJournals()) { theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit); forgetRecoveredAcks(txid);
}
forgetRecoveredAcks(txid);
} }
} else { }else {
LOG.error("Null transaction passed on commit"); LOG.error("Null transaction passed on commit");
} }
} }
/** /**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/ */
public void rollback(TransactionId txid) throws IOException { public void rollback(TransactionId txid) throws IOException {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
KahaTransactionInfo info = getTransactionInfo(txid); KahaTransactionInfo info = getTransactionInfo(txid);
for (Journal journal : theStore.getJournalManager().getJournals()) { theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
theStore.store(journal, new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
}
forgetRecoveredAcks(txid); forgetRecoveredAcks(txid);
} else { } else {
inflightTransactions.remove(txid); inflightTransactions.remove(txid);
@ -371,7 +352,6 @@ public class KahaDBTransactionStore implements TransactionStore {
public Message getMessage() { public Message getMessage() {
return message; return message;
} }
@Override @Override
public Future<Object> run(ConnectionContext ctx) throws IOException { public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message); destination.addMessage(ctx, message);
@ -399,7 +379,6 @@ public class KahaDBTransactionStore implements TransactionStore {
public Message getMessage() { public Message getMessage() {
return message; return message;
} }
@Override @Override
public Future<Object> run(ConnectionContext ctx) throws IOException { public Future<Object> run(ConnectionContext ctx) throws IOException {
return destination.asyncAddQueueMessage(ctx, message); return destination.asyncAddQueueMessage(ctx, message);
@ -417,7 +396,7 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException { throws IOException {
if (message.getTransactionId() != null) { if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.addMessage(context, message); destination.addMessage(context, message);
return AbstractMessageStore.FUTURE; return AbstractMessageStore.FUTURE;
} else { } else {
@ -427,7 +406,6 @@ public class KahaDBTransactionStore implements TransactionStore {
public Message getMessage() { public Message getMessage() {
return message; return message;
} }
@Override @Override
public Future run(ConnectionContext ctx) throws IOException { public Future run(ConnectionContext ctx) throws IOException {
return destination.asyncAddTopicMessage(ctx, message); return destination.asyncAddTopicMessage(ctx, message);
@ -449,7 +427,7 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException { throws IOException {
if (ack.isInTransaction()) { if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.removeMessage(context, ack); destination.removeMessage(context, ack);
} else { } else {
Tx tx = getTx(ack.getTransactionId()); Tx tx = getTx(ack.getTransactionId());
@ -475,7 +453,7 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException { throws IOException {
if (ack.isInTransaction()) { if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.removeAsyncMessage(context, ack); destination.removeAsyncMessage(context, ack);
} else { } else {
Tx tx = getTx(ack.getTransactionId()); Tx tx = getTx(ack.getTransactionId());
@ -501,7 +479,7 @@ public class KahaDBTransactionStore implements TransactionStore {
final MessageId messageId, final MessageAck ack) throws IOException { final MessageId messageId, final MessageAck ack) throws IOException {
if (ack.isInTransaction()) { if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack); destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
} else { } else {
Tx tx = getTx(ack.getTransactionId()); Tx tx = getTx(ack.getTransactionId());
@ -523,7 +501,6 @@ public class KahaDBTransactionStore implements TransactionStore {
private KahaTransactionInfo getTransactionInfo(TransactionId txid) { private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
return theStore.createTransactionInfo(txid); return theStore.getTransactionIdTransformer().transform(txid);
} }
} }

View File

@ -0,0 +1,295 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import javax.xml.bind.annotation.XmlAnyAttribute;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports
* distribution of destinations across multiple kahaDB persistence adapters
*
* @org.apache.xbean.XBean element="mKahaDB"
*/
public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
BrokerService brokerService;
List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
// all local store transactions are XA, 2pc if more than one adapter involved
TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
@Override
public KahaTransactionInfo transform(TransactionId txid) {
if (txid == null) {
return null;
}
KahaTransactionInfo rc = new KahaTransactionInfo();
KahaXATransactionId kahaTxId = new KahaXATransactionId();
if (txid.isLocalTransaction()) {
LocalTransactionId t = (LocalTransactionId) txid;
kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
} else {
XATransactionId t = (XATransactionId) txid;
kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
kahaTxId.setFormatId(t.getFormatId());
}
rc.setXaTransacitonId(kahaTxId);
return rc;
}
};
/**
* Sets the FilteredKahaDBPersistenceAdapter entries
*
* @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
*/
public void setFilteredPersistenceAdapters(List entries) {
for (Object entry : entries) {
FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
if (filteredAdapter.getDestination() == null) {
filteredAdapter.setDestination(matchAll);
}
adapter.setDirectory(new File(getDirectory(), nameFromDestinationFilter(filteredAdapter.getDestination())));
// need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
adapters.add(adapter);
}
super.setEntries(entries);
}
private String nameFromDestinationFilter(ActiveMQDestination destination) {
return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
}
public boolean isLocalXid(TransactionId xid) {
return xid instanceof XATransactionId &&
((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
}
public void beginTransaction(ConnectionContext context) throws IOException {
throw new IllegalStateException();
}
public void checkpoint(final boolean sync) throws IOException {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.checkpoint(sync);
}
}
public void commitTransaction(ConnectionContext context) throws IOException {
throw new IllegalStateException();
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
}
private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
Object result = this.chooseValue(destination);
if (result == null) {
throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
}
return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
}
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
public void deleteAllMessages() throws IOException {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.deleteAllMessages();
}
transactionStore.deleteAllMessages();
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
for (PersistenceAdapter persistenceAdapter : adapters) {
results.addAll(persistenceAdapter.getDestinations());
}
return results;
}
public long getLastMessageBrokerSequenceId() throws IOException {
long maxId = -1;
for (PersistenceAdapter persistenceAdapter : adapters) {
maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
}
return maxId;
}
public long getLastProducerSequenceId(ProducerId id) throws IOException {
long maxId = -1;
for (PersistenceAdapter persistenceAdapter : adapters) {
maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
}
return maxId;
}
public void removeQueueMessageStore(ActiveMQQueue destination) {
getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination);
}
public void removeTopicMessageStore(ActiveMQTopic destination) {
getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination);
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IllegalStateException();
}
public void setBrokerName(String brokerName) {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.setBrokerName(brokerName);
}
}
public void setUsageManager(SystemUsage usageManager) {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.setUsageManager(usageManager);
}
}
public long size() {
long size = 0;
for (PersistenceAdapter persistenceAdapter : adapters) {
size += persistenceAdapter.size();
}
return size;
}
public void start() throws Exception {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.start();
}
}
public void stop() throws Exception {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.stop();
}
}
public File getDirectory() {
return this.directory;
}
@Override
public void setDirectory(File dir) {
this.directory = directory;
}
public void setBrokerService(BrokerService brokerService) {
for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.setBrokerService(brokerService);
}
this.brokerService = brokerService;
}
public BrokerService getBrokerService() {
return brokerService;
}
public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
this.transactionStore = transactionStore;
}
/**
* Set the max file length of the transaction journal
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
* be used
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalMaxFileLength(int maxFileLength) {
transactionStore.setJournalMaxFileLength(maxFileLength);
}
public int getJournalMaxFileLength() {
return transactionStore.getJournalMaxFileLength();
}
/**
* Set the max write batch size of the transaction journal
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
* be used
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalWriteBatchSize(int journalWriteBatchSize) {
transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
}
public int getJournalMaxWriteBatchSize() {
return transactionStore.getJournalMaxWriteBatchSize();
}
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
}
}

View File

@ -0,0 +1,419 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MultiKahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
private Journal journal;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
}
public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
}
};
}
public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
subscriptionName, messageId, ack);
}
};
}
public void deleteAllMessages() {
IOHelper.deleteChildren(getDirectory());
}
public int getJournalMaxFileLength() {
return journalMaxFileLength;
}
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.journalMaxFileLength = journalMaxFileLength;
}
public int getJournalMaxWriteBatchSize() {
return journalWriteBatchSize;
}
public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
this.journalWriteBatchSize = journalWriteBatchSize;
}
public class Tx {
private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
private int prepareLocationId = 0;
public void trackStore(TransactionStore store) {
stores.add(store);
}
public Set<TransactionStore> getStores() {
return stores;
}
public void trackPrepareLocation(Location location) {
this.prepareLocationId = location.getDataFileId();
}
public int getPreparedLocationId() {
return prepareLocationId;
}
}
public Tx getTx(TransactionId txid) {
Tx tx = inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx();
inflightTransactions.put(txid, tx);
}
return tx;
}
public Tx removeTx(TransactionId txid) {
return inflightTransactions.remove(txid);
}
public void prepare(TransactionId txid) throws IOException {
Tx tx = getTx(txid);
for (TransactionStore store : tx.getStores()) {
store.prepare(txid);
}
}
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
throws IOException {
if (preCommit != null) {
preCommit.run();
}
Tx tx = getTx(txid);
if (wasPrepared) {
for (TransactionStore store : tx.getStores()) {
store.commit(txid, true, null, null);
}
} else {
// can only do 1pc on a single store
if (tx.getStores().size() == 1) {
for (TransactionStore store : tx.getStores()) {
store.commit(txid, false, null, null);
}
} else {
// need to do local 2pc
for (TransactionStore store : tx.getStores()) {
store.prepare(txid);
}
persistOutcome(tx, txid);
for (TransactionStore store : tx.getStores()) {
store.commit(txid, true, null, null);
}
persistCompletion(txid);
}
}
removeTx(txid);
if (postCommit != null) {
postCommit.run();
}
}
public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
}
public void persistCompletion(TransactionId txid) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)));
}
private Location store(JournalCommand<?> data) throws IOException {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeFramed(os);
Location location = journal.write(os.toByteSequence(), true);
journal.setLastAppendLocation(location);
return location;
}
public void rollback(TransactionId txid) throws IOException {
Tx tx = removeTx(txid);
if (tx != null) {
for (TransactionStore store : tx.getStores()) {
store.rollback(txid);
}
}
}
public void start() throws Exception {
journal = new Journal() {
@Override
protected void cleanup() {
super.cleanup();
txStoreCleanup();
}
};
journal.setDirectory(getDirectory());
journal.setMaxFileLength(journalMaxFileLength);
journal.setWriteBatchSize(journalWriteBatchSize);
IOHelper.mkdirs(journal.getDirectory());
journal.start();
recoverPendingLocalTransactions();
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
}
private void txStoreCleanup() {
Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
for (Tx tx : inflightTransactions.values()) {
knownDataFileIds.remove(tx.getPreparedLocationId());
}
try {
journal.removeDataFiles(knownDataFileIds);
} catch (Exception e) {
LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
}
}
private File getDirectory() {
return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
}
public void stop() throws Exception {
journal.close();
journal = null;
}
private void recoverPendingLocalTransactions() throws IOException {
Location location = journal.getNextLocation(null);
while (location != null) {
process(load(location));
location = journal.getNextLocation(location);
}
recoveredPendingCommit.addAll(inflightTransactions.keySet());
LOG.info("pending local transactions: " + recoveredPendingCommit);
}
public JournalCommand<?> load(Location location) throws IOException {
DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte);
if (type == null) {
throw new IOException("Could not load journal record. Invalid location: " + location);
}
JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
message.mergeFramed(is);
return message;
}
public void process(JournalCommand<?> command) throws IOException {
switch (command.type()) {
case KAHA_PREPARE_COMMAND:
KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
break;
case KAHA_COMMIT_COMMAND:
KahaCommitCommand commitCommand = (KahaCommitCommand) command;
removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
break;
case KAHA_TRACE_COMMAND:
break;
default:
throw new IOException("Unexpected command in transaction journal: " + command);
}
}
public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
@Override
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
try {
getTx(xid).trackStore(adapter.createTransactionStore());
} catch (IOException e) {
LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
}
listener.recover(xid, addedMessages, acks);
}
});
}
try {
Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
// force completion of local xa
for (TransactionId txid : broker.getPreparedTransactions(null)) {
if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
try {
if (recoveredPendingCommit.contains(txid)) {
LOG.info("delivering pending commit outcome for tid: " + txid);
broker.commitTransaction(null, txid, false);
} else {
LOG.info("delivering rollback outcome to store for tid: " + txid);
broker.forgetTransaction(null, txid);
}
persistCompletion(txid);
} catch (Exception ex) {
LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
}
}
}
} catch (Exception e) {
LOG.error("failed to resolve pending local transactions", e);
}
}
void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
getTx(message.getTransactionId()).trackStore(transactionStore);
}
destination.addMessage(context, message);
}
Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
getTx(message.getTransactionId()).trackStore(transactionStore);
destination.addMessage(context, message);
return AbstractMessageStore.FUTURE;
} else {
return destination.asyncAddQueueMessage(context, message);
}
}
Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
getTx(message.getTransactionId()).trackStore(transactionStore);
destination.addMessage(context, message);
return AbstractMessageStore.FUTURE;
} else {
return destination.asyncAddTopicMessage(context, message);
}
}
final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException {
if (ack.getTransactionId() != null) {
getTx(ack.getTransactionId()).trackStore(transactionStore);
}
destination.removeMessage(context, ack);
}
final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException {
if (ack.getTransactionId() != null) {
getTx(ack.getTransactionId()).trackStore(transactionStore);
}
destination.removeAsyncMessage(context, ack);
}
final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {
if (ack.getTransactionId() != null) {
getTx(ack.getTransactionId()).trackStore(transactionStore);
}
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
public class TransactionIdConversion {
static KahaTransactionInfo convertToLocal(TransactionId tx) {
KahaTransactionInfo rc = new KahaTransactionInfo();
LocalTransactionId t = (LocalTransactionId) tx;
KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
kahaTxId.setConnectionId(t.getConnectionId().getValue());
kahaTxId.setTransacitonId(t.getValue());
rc.setLocalTransacitonId(kahaTxId);
return rc;
}
static KahaTransactionInfo convert(TransactionId txid) {
if (txid == null) {
return null;
}
KahaTransactionInfo rc;
if (txid.isLocalTransaction()) {
rc = convertToLocal(txid);
} else {
rc = new KahaTransactionInfo();
XATransactionId t = (XATransactionId) txid;
KahaXATransactionId kahaTxId = new KahaXATransactionId();
kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
kahaTxId.setFormatId(t.getFormatId());
rc.setXaTransacitonId(kahaTxId);
}
return rc;
}
static TransactionId convert(KahaTransactionInfo transactionInfo) {
if (transactionInfo.hasLocalTransacitonId()) {
KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
LocalTransactionId rc = new LocalTransactionId();
rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
rc.setValue(tx.getTransacitonId());
return rc;
} else {
KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
XATransactionId rc = new XATransactionId();
rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
rc.setFormatId(tx.getFormatId());
return rc;
}
}
}

View File

@ -0,0 +1,8 @@
package org.apache.activemq.store.kahadb;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
public interface TransactionIdTransformer {
KahaTransactionInfo transform(TransactionId txid);
}

View File

@ -37,6 +37,8 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId; import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Used to simulate the recovery that occurs when a broker shuts down. * Used to simulate the recovery that occurs when a broker shuts down.
@ -44,7 +46,7 @@ import org.apache.activemq.util.JMXSupport;
* *
*/ */
public class XARecoveryBrokerTest extends BrokerRestartTestSupport { public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
public void testPreparedJmxView() throws Exception { public void testPreparedJmxView() throws Exception {
ActiveMQDestination destination = createDestination(); ActiveMQDestination destination = createDestination();
@ -202,7 +204,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
} }
// We should get the committed transactions. // We should get the committed transactions.
for (int i = 0; i < 4; i++) { for (int i = 0; i < expectedMessageCount(4, destination); i++) {
Message m = receiveMessage(connection); Message m = receiveMessage(connection);
assertNotNull(m); assertNotNull(m);
} }
@ -249,7 +251,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo); connection.send(consumerInfo);
for (int i = 0; i < 4; i++) { for (int i = 0; i < expectedMessageCount(4, destination); i++) {
Message m = receiveMessage(connection); Message m = receiveMessage(connection);
assertNotNull(m); assertNotNull(m);
} }
@ -276,22 +278,26 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
connection.send(message); connection.send(message);
} }
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Begin the transaction. // Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo); XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid)); connection.send(createBeginTransaction(connectionInfo, txid));
Message m = null;
for (int i = 0; i < 4; i++) {
m = receiveMessage(connection);
assertNotNull(m);
}
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE); ConsumerInfo consumerInfo;
ack.setTransactionId(txid); Message m = null;
connection.send(ack); for (ActiveMQDestination dest : destinationList(destination)) {
// Setup the consumer and receive the message.
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
m = receiveMessage(connection);
assertNotNull(m);
}
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
// Commit // Commit
connection.request(createCommitTransaction1Phase(connectionInfo, txid)); connection.request(createCommitTransaction1Phase(connectionInfo, txid));
@ -334,23 +340,27 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
connection.send(message); connection.send(message);
} }
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Begin the transaction. // Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo); XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid)); connection.send(createBeginTransaction(connectionInfo, txid));
Message m = null;
for (int i = 0; i < 4; i++) {
m = receiveMessage(connection);
assertNotNull(m);
}
// one ack with last received, mimic a beforeEnd synchronization ConsumerInfo consumerInfo;
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE); Message m = null;
ack.setTransactionId(txid); for (ActiveMQDestination dest : destinationList(destination)) {
connection.send(ack); // Setup the consumer and receive the message.
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
m = receiveMessage(connection);
assertNotNull(m);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
connection.request(createPrepareTransaction(connectionInfo, txid)); connection.request(createPrepareTransaction(connectionInfo, txid));
@ -404,23 +414,27 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
connection.send(message); connection.send(message);
} }
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Begin the transaction. // Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo); XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid)); connection.send(createBeginTransaction(connectionInfo, txid));
Message message = null;
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization ConsumerInfo consumerInfo;
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE); Message message = null;
ack.setTransactionId(txid); for (ActiveMQDestination dest : destinationList(destination)) {
connection.send(ack); // Setup the consumer and receive the message.
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
connection.request(createPrepareTransaction(connectionInfo, txid)); connection.request(createPrepareTransaction(connectionInfo, txid));
@ -454,13 +468,20 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// Begin new transaction for redelivery // Begin new transaction for redelivery
txid = createXATransaction(sessionInfo); txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid)); connection.send(createBeginTransaction(connectionInfo, txid));
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection); for (ActiveMQDestination dest : destinationList(destination)) {
assertNotNull(message); // Setup the consumer and receive the message.
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
} }
ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Commit // Commit
connection.request(createCommitTransaction1Phase(connectionInfo, txid)); connection.request(createCommitTransaction1Phase(connectionInfo, txid));
@ -470,6 +491,14 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
} }
private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
}
private int expectedMessageCount(int i, ActiveMQDestination destination) {
return i * (destination.isComposite() ? destination.getCompositeDestinations().length : 1);
}
public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception { public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination(); ActiveMQDestination destination = createDestination();

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.util.LinkedList;
import java.util.List;
import javax.jms.JMSException;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.util.JMXSupport;
public class mKahaDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
@Override
protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker);
MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter());
adapters.add(defaultEntry);
FilteredKahaDBPersistenceAdapter special = new FilteredKahaDBPersistenceAdapter();
special.setDestination(new ActiveMQQueue("special"));
special.setPersistenceAdapter(new KahaDBPersistenceAdapter());
adapters.add(special);
mKahaDB.setFilteredPersistenceAdapters(adapters);
broker.setPersistenceAdapter(mKahaDB);
}
public static Test suite() {
return suite(mKahaDBXARecoveryBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
protected ActiveMQDestination createDestination() {
return new ActiveMQQueue("test,special");
}
}

View File

@ -27,6 +27,8 @@ import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.DefaultIOExceptionHandler; import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -56,7 +58,7 @@ public class AMQ2736Test {
// test hack, close the journal to ensure no further journal updates when broker stops // test hack, close the journal to ensure no further journal updates when broker stops
// mimic kill -9 in terms of no normal shutdown sequence // mimic kill -9 in terms of no normal shutdown sequence
store.getJournalManager().close(); store.getJournal().close();
try { try {
store.close(); store.close();
} catch (Exception expectedLotsAsJournalBorked) { } catch (Exception expectedLotsAsJournalBorked) {

View File

@ -16,7 +16,11 @@
*/ */
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
@ -28,6 +32,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -37,8 +42,6 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class AMQ2982Test { public class AMQ2982Test {
@ -62,7 +65,7 @@ public class AMQ2982Test {
// ensure save memory publishing, use the right lock // ensure save memory publishing, use the right lock
indexLock.readLock().lock(); indexLock.readLock().lock();
try { try {
return getJournalManager().getFileMap().size(); return getJournal().getFileMap().size();
} finally { } finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -30,6 +34,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.store.kahadb.KahaDBStore;
@ -37,7 +42,6 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
public class AMQ2983Test { public class AMQ2983Test {
@ -63,7 +67,7 @@ public class AMQ2983Test {
// ensure save memory publishing, use the right lock // ensure save memory publishing, use the right lock
indexLock.readLock().lock(); indexLock.readLock().lock();
try { try {
return getJournalManager().getFileMap().size(); return getJournal().getFileMap().size();
} finally { } finally {
indexLock.readLock().unlock(); indexLock.readLock().unlock();
} }

View File

@ -31,9 +31,9 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
protected long initialConsumerDelay = 0; protected long initialConsumerDelay = 0;
@Override @Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
numberOfDestinations=10; numberOfDestinations=1;
numberOfConsumers = 1; numberOfConsumers = 1;
numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "1")); numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "20"), 20);
sampleCount= Integer.parseInt(System.getProperty("SimpleDurableTopicTest.sampleCount", "1000"), 10); sampleCount= Integer.parseInt(System.getProperty("SimpleDurableTopicTest.sampleCount", "1000"), 10);
playloadSize = 1024; playloadSize = 1024;
super.setUp(); super.setUp();

View File

@ -0,0 +1,282 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StorePerDestinationTest {
static final Logger LOG = LoggerFactory.getLogger(StorePerDestinationTest.class);
final static int maxFileLength = 1024*100;
final static int numToSend = 10000;
final Vector<Throwable> exceptions = new Vector<Throwable>();
BrokerService brokerService;
protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistenceAdapter(kaha);
return broker;
}
private KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setJournalMaxFileLength(maxFileLength);
kaha.setCleanupInterval(5000);
if (delete) {
kaha.deleteAllMessages();
}
return kaha;
}
@Before
public void prepareCleanBrokerWithMultiStore() throws Exception {
prepareBrokerWithMultiStore(true);
}
public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
if (deleteAllMessages) {
multiKahaDBPersistenceAdapter.deleteAllMessages();
}
ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
FilteredKahaDBPersistenceAdapter theRest = new FilteredKahaDBPersistenceAdapter();
theRest.setPersistenceAdapter(createStore(deleteAllMessages));
// default destination when not set is a match for all
adapters.add(theRest);
// separate store for FastQ
FilteredKahaDBPersistenceAdapter fastQStore = new FilteredKahaDBPersistenceAdapter();
fastQStore.setPersistenceAdapter(createStore(deleteAllMessages));
fastQStore.setDestination(new ActiveMQQueue("FastQ"));
adapters.add(fastQStore);
multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
brokerService = createBroker(multiKahaDBPersistenceAdapter);
}
@After
public void tearDown() throws Exception {
brokerService.stop();
}
@Test
public void testTransactedSendReceive() throws Exception {
brokerService.start();
sendMessages(true, "SlowQ", 1, 0);
assertEquals("got one", 1, receiveMessages(true, "SlowQ", 1));
}
@Test
public void testTransactedSendReceiveAcrossStores() throws Exception {
brokerService.start();
sendMessages(true, "SlowQ,FastQ", 1, 0);
assertEquals("got one", 2, receiveMessages(true, "SlowQ,FastQ", 2));
}
@Test
public void testCommitRecovery() throws Exception {
doTestRecovery(true);
}
@Test
public void testRollbackRecovery() throws Exception {
doTestRecovery(false);
}
public void doTestRecovery(final boolean haveOutcome) throws Exception {
final MultiKahaDBPersistenceAdapter persistenceAdapter =
(MultiKahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
MultiKahaDBTransactionStore transactionStore =
new MultiKahaDBTransactionStore(persistenceAdapter) {
@Override
public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
if (haveOutcome) {
super.persistOutcome(tx, txid);
}
try {
// IOExceptions will stop the broker
persistenceAdapter.stop();
} catch (Exception e) {
LOG.error("ex on stop ", e);
exceptions.add(e);
}
}
};
persistenceAdapter.setTransactionStore(transactionStore);
brokerService.start();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// commit will block
sendMessages(true, "SlowQ,FastQ", 1, 0);
} catch(Exception expected) {
LOG.info("expected", expected);
}
}
});
brokerService.waitUntilStopped();
// interrupt the send thread
executorService.shutdownNow();
// verify auto recovery
prepareBrokerWithMultiStore(false);
brokerService.start();
assertEquals("expect to get the recovered message", haveOutcome ? 2 : 0, receiveMessages(false, "SlowQ,FastQ", 2));
assertEquals("all transactions are complete", 0, brokerService.getBroker().getPreparedTransactions(null).length);
}
@Test
public void testSlowFastDestinationsStoreUsage() throws Exception {
brokerService.start();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendMessages(false, "SlowQ", 50, 500);
} catch (Exception e) {
exceptions.add(e);
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendMessages(false, "FastQ", numToSend, 0);
} catch (Exception e) {
exceptions.add(e);
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
assertEquals("Got all sent", numToSend, receiveMessages(false, "FastQ", numToSend));
} catch (Exception e) {
exceptions.add(e);
}
}
});
executorService.shutdown();
assertTrue("consumers executor finished on time", executorService.awaitTermination(60, TimeUnit.SECONDS));
final SystemUsage usage = brokerService.getSystemUsage();
assertTrue("Store is not hogged", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
long storeUsage = usage.getStoreUsage().getUsage();
LOG.info("Store Usage: " + storeUsage);
return storeUsage < 5 * maxFileLength;
}
}));
assertTrue("no exceptions", exceptions.isEmpty());
}
private void sendMessages(boolean transacted, String destName, int count, long sleep) throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
try {
Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue(destName));
for (int i = 0; i < count; i++) {
if (sleep > 0) {
TimeUnit.MILLISECONDS.sleep(sleep);
}
producer.send(session.createTextMessage(createContent(i)));
}
if (transacted) {
session.commit();
}
} finally {
connection.close();
}
}
private int receiveMessages(boolean transacted, String destName, int max) throws JMSException {
int rc = 0;
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
try {
connection.start();
Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(destName));
while (rc < max && messageConsumer.receive(4000) != null) {
rc++;
if (transacted && rc % 200 == 0) {
session.commit();
}
}
if (transacted) {
session.commit();
}
return rc;
} finally {
connection.close();
}
}
private String createContent(int i) {
StringBuilder sb = new StringBuilder(i + ":");
while (sb.length() < 1024) {
sb.append("*");
}
return sb.toString();
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.util.Vector; import java.util.Vector;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -73,7 +72,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public static Test suite() { public static Test suite() {
return suite(DurableSubscriptionOfflineTest.class); return suite(DurableSubscriptionOfflineTest.class);
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {
exceptions.clear(); exceptions.clear();
topic = (ActiveMQTopic) createDestination(); topic = (ActiveMQTopic) createDestination();
@ -89,9 +88,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
private void createBroker() throws Exception { private void createBroker() throws Exception {
createBroker(true); createBroker(true);
} }
private void createBroker(boolean deleteAllMessages) throws Exception { private void createBroker(boolean deleteAllMessages) throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")"); broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
broker.setBrokerName(getName(true)); broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(deleteAllMessages); broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
broker.getManagementContext().setCreateConnector(false); broker.getManagementContext().setCreateConnector(false);
@ -105,14 +104,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
policyMap.setDefaultEntry(policy); policyMap.setDefaultEntry(policy);
broker.setDestinationPolicy(policyMap); broker.setDestinationPolicy(policyMap);
} }
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
// ensure it kicks in during tests // ensure it kicks in during tests
((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).setCleanupPeriod(2 * 1000); ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
} else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
// have lots of journal files // have lots of journal files
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength); ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
} }
broker.start(); broker.start();
} }
@ -124,9 +123,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception { public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
this.addCombinationValues("usePrioritySupport", this.addCombinationValues("usePrioritySupport",
new Object[]{Boolean.TRUE, Boolean.FALSE}); new Object[]{ Boolean.TRUE, Boolean.FALSE});
} }
public void testConsumeOnlyMatchedMessages() throws Exception { public void testConsumeOnlyMatchedMessages() throws Exception {
@ -171,110 +170,110 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count); assertEquals(sent, listener.count);
} }
public void testConsumeAllMatchedMessages() throws Exception { public void testConsumeAllMatchedMessages() throws Exception {
// create durable subscription // create durable subscription
Connection con = createConnection(); Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close(); session.close();
con.close(); con.close();
// send messages // send messages
con = createConnection(); con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null); MessageProducer producer = session.createProducer(null);
int sent = 0; int sent = 0;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
sent++; sent++;
Message message = session.createMessage(); Message message = session.createMessage();
message.setStringProperty("filter", "true"); message.setStringProperty("filter", "true");
producer.send(topic, message); producer.send(topic, message);
} }
Thread.sleep(1 * 1000); Thread.sleep(1 * 1000);
session.close(); session.close();
con.close(); con.close();
// consume messages // consume messages
con = createConnection(); con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener(); Listener listener = new Listener();
consumer.setMessageListener(listener); consumer.setMessageListener(listener);
Thread.sleep(3 * 1000); Thread.sleep(3 * 1000);
session.close(); session.close();
con.close(); con.close();
assertEquals(sent, listener.count);
}
assertEquals(sent, listener.count);
}
public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception { public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
this.addCombinationValues("usePrioritySupport", this.addCombinationValues("usePrioritySupport",
new Object[]{Boolean.TRUE, Boolean.FALSE}); new Object[]{ Boolean.TRUE, Boolean.FALSE});
} }
public void testVerifyAllConsumedAreAcked() throws Exception { public void testVerifyAllConsumedAreAcked() throws Exception {
// create durable subscription // create durable subscription
Connection con = createConnection(); Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close(); session.close();
con.close(); con.close();
// send messages // send messages
con = createConnection(); con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null); MessageProducer producer = session.createProducer(null);
int sent = 0; int sent = 0;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
sent++; sent++;
Message message = session.createMessage(); Message message = session.createMessage();
message.setStringProperty("filter", "true"); message.setStringProperty("filter", "true");
producer.send(topic, message); producer.send(topic, message);
} }
Thread.sleep(1 * 1000); Thread.sleep(1 * 1000);
session.close(); session.close();
con.close(); con.close();
// consume messages // consume messages
con = createConnection(); con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener(); Listener listener = new Listener();
consumer.setMessageListener(listener); consumer.setMessageListener(listener);
Thread.sleep(3 * 1000); Thread.sleep(3 * 1000);
session.close(); session.close();
con.close(); con.close();
LOG.info("Consumed: " + listener.count); LOG.info("Consumed: " + listener.count);
assertEquals(sent, listener.count); assertEquals(sent, listener.count);
// consume messages again, should not get any // consume messages again, should not get any
con = createConnection(); con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
listener = new Listener(); listener = new Listener();
consumer.setMessageListener(listener); consumer.setMessageListener(listener);
Thread.sleep(3 * 1000); Thread.sleep(3 * 1000);
session.close(); session.close();
con.close(); con.close();
assertEquals(0, listener.count); assertEquals(0, listener.count);
} }
public void testTwoOfflineSubscriptionCanConsume() throws Exception { public void testTwoOfflineSubscriptionCanConsume() throws Exception {
// create durable subscription 1 // create durable subscription 1
@ -445,9 +444,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
this.addCombinationValues("usePrioritySupport", this.addCombinationValues("usePrioritySupport",
new Object[]{Boolean.TRUE, Boolean.FALSE}); new Object[]{ Boolean.TRUE, Boolean.FALSE});
} }
public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
@ -596,15 +595,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con.close(); con.close();
assertEquals("offline consumer got all", sent, listener.count); assertEquals("offline consumer got all", sent, listener.count);
} }
public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
} }
private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))"; private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
// create offline subs 1 // create offline subs 1
Connection con = createConnection("offCli1"); Connection con = createConnection("offCli1");
@ -752,9 +750,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception { public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
} }
public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception { public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
// create offline subs 1 // create offline subs 1
Connection con = createConnection("offCli1"); Connection con = createConnection("offCli1");
@ -795,7 +793,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
Thread.sleep(3 * 1000); Thread.sleep(3 * 1000);
broker.stop(); broker.stop();
createBroker(false /*deleteAllMessages*/); createBroker(false /*deleteAllMessages*/);
// send more messages // send more messages
con = createConnection(); con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -842,7 +840,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public void initCombosForTestOfflineAfterRestart() throws Exception { public void initCombosForTestOfflineAfterRestart() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
} }
public void testOfflineSubscriptionAfterRestart() throws Exception { public void testOfflineSubscriptionAfterRestart() throws Exception {
@ -978,7 +976,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
int filtered = 0; int filtered = 0;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
boolean filter = (i % 2 == 0); //(int) (Math.random() * 2) >= 1; boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
if (filter) if (filter)
filtered++; filtered++;
@ -1076,7 +1074,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
sent = 0; sent = 0;
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
Message message = session.createMessage(); Message message = session.createMessage();
message.setStringProperty("filter", i == 1 ? "true" : "false"); message.setStringProperty("filter", i==1 ? "true" : "false");
producer.send(topic, message); producer.send(topic, message);
sent++; sent++;
} }
@ -1084,7 +1082,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
Thread.sleep(1 * 1000); Thread.sleep(1 * 1000);
session.close(); session.close();
con.close(); con.close();
LOG.info("cli1 again, should get 1 new ones"); LOG.info("cli1 again, should get 1 new ones");
con = createConnection("cli1"); con = createConnection("cli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -1206,7 +1204,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
MessageProducer producer = session.createProducer(null); MessageProducer producer = session.createProducer(null);
final int toSend = 500; final int toSend = 500;
final String payload = new byte[40 * 1024].toString(); final String payload = new byte[40*1024].toString();
int sent = 0; int sent = 0;
for (int i = sent; i < toSend; i++) { for (int i = sent; i < toSend; i++) {
Message message = session.createTextMessage(payload); Message message = session.createTextMessage(payload);
@ -1233,7 +1231,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
consumer.setMessageListener(listener); consumer.setMessageListener(listener);
assertTrue("got all sent", Wait.waitFor(new Wait.Condition() { assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("Want: " + toSend + ", current: " + listener.count); LOG.info("Want: " + toSend + ", current: " + listener.count);
return listener.count == toSend; return listener.count == toSend;
} }
})); }));
@ -1243,7 +1241,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
destroyBroker(); destroyBroker();
createBroker(false); createBroker(false);
KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
assertEquals("only one journal file left after restart", 1, pa.getStore().getJournalManager().getFileMap().size()); assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
} }
public static class Listener implements MessageListener { public static class Listener implements MessageListener {
@ -1252,23 +1250,20 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
Listener() { Listener() {
} }
Listener(String id) { Listener(String id) {
this.id = id; this.id = id;
} }
public void onMessage(Message message) { public void onMessage(Message message) {
count++; count++;
if (id != null) { if (id != null) {
try { try {
LOG.info(id + ", " + message.getJMSMessageID()); LOG.info(id + ", " + message.getJMSMessageID());
} catch (Exception ignored) { } catch (Exception ignored) {}
}
} }
} }
} }
public class FilterCheckListener extends Listener { public class FilterCheckListener extends Listener {
public void onMessage(Message message) { public void onMessage(Message message) {
count++; count++;
@ -1278,11 +1273,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
if (b != null) { if (b != null) {
boolean c = message.getBooleanProperty("$c"); boolean c = message.getBooleanProperty("$c");
assertTrue("", c); assertTrue("", c);
} else { }
else {
String d = message.getStringProperty("$d"); String d = message.getStringProperty("$d");
assertTrue("", "D1".equals(d) || "D2".equals(d)); assertTrue("", "D1".equals(d) || "D2".equals(d));
} }
} catch (JMSException e) { }
catch (JMSException e) {
exceptions.add(e); exceptions.add(e);
} }
} }

View File

@ -382,7 +382,7 @@ public class Journal {
started = false; started = false;
} }
synchronized void cleanup() { protected synchronized void cleanup() {
if (accessorPool != null) { if (accessorPool != null) {
accessorPool.disposeUnused(); accessorPool.disposeUnused();
} }