mirror of https://github.com/apache/activemq.git
Added a new TempKahaDBStore which is an optimized KahaDB based message for temporary messages.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f73b622a60
commit
b7c2b678bb
|
@ -0,0 +1,564 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTempTopic;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
import org.apache.activemq.command.SubscriptionInfo;
|
||||||
|
import org.apache.activemq.command.TransactionId;
|
||||||
|
import org.apache.activemq.command.XATransactionId;
|
||||||
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
|
import org.apache.activemq.protobuf.Buffer;
|
||||||
|
import org.apache.activemq.store.AbstractMessageStore;
|
||||||
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.TopicMessageStore;
|
||||||
|
import org.apache.activemq.store.TransactionRecoveryListener;
|
||||||
|
import org.apache.activemq.store.TransactionStore;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaDestination;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaLocation;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
|
||||||
|
import org.apache.activemq.usage.MemoryUsage;
|
||||||
|
import org.apache.activemq.usage.SystemUsage;
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
import org.apache.kahadb.journal.Location;
|
||||||
|
import org.apache.kahadb.page.Transaction;
|
||||||
|
|
||||||
|
public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
|
||||||
|
|
||||||
|
private WireFormat wireFormat = new OpenWireFormat();
|
||||||
|
|
||||||
|
public void setBrokerName(String brokerName) {
|
||||||
|
}
|
||||||
|
public void setUsageManager(SystemUsage usageManager) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public TransactionStore createTransactionStore() throws IOException {
|
||||||
|
return new TransactionStore(){
|
||||||
|
|
||||||
|
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
|
||||||
|
processCommit(txid);
|
||||||
|
}
|
||||||
|
public void prepare(TransactionId txid) throws IOException {
|
||||||
|
processPrepare(txid);
|
||||||
|
}
|
||||||
|
public void rollback(TransactionId txid) throws IOException {
|
||||||
|
processRollback(txid);
|
||||||
|
}
|
||||||
|
public void recover(TransactionRecoveryListener listener) throws IOException {
|
||||||
|
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
|
||||||
|
XATransactionId xid = (XATransactionId)entry.getKey();
|
||||||
|
ArrayList<Message> messageList = new ArrayList<Message>();
|
||||||
|
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
|
||||||
|
|
||||||
|
for (Operation op : entry.getValue()) {
|
||||||
|
if( op.getClass() == AddOpperation.class ) {
|
||||||
|
AddOpperation addOp = (AddOpperation)op;
|
||||||
|
Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
|
||||||
|
messageList.add(msg);
|
||||||
|
} else {
|
||||||
|
RemoveOpperation rmOp = (RemoveOpperation)op;
|
||||||
|
MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
|
||||||
|
ackList.add(ack);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Message[] addedMessages = new Message[messageList.size()];
|
||||||
|
MessageAck[] acks = new MessageAck[ackList.size()];
|
||||||
|
messageList.toArray(addedMessages);
|
||||||
|
ackList.toArray(acks);
|
||||||
|
listener.recover(xid, addedMessages, acks);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void start() throws Exception {
|
||||||
|
}
|
||||||
|
public void stop() throws Exception {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public class KahaDBMessageStore extends AbstractMessageStore {
|
||||||
|
protected KahaDestination dest;
|
||||||
|
|
||||||
|
public KahaDBMessageStore(ActiveMQDestination destination) {
|
||||||
|
super(destination);
|
||||||
|
this.dest = convert( destination );
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQDestination getDestination() {
|
||||||
|
return destination;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addMessage(ConnectionContext context, Message message) throws IOException {
|
||||||
|
KahaAddMessageCommand command = new KahaAddMessageCommand();
|
||||||
|
command.setDestination(dest);
|
||||||
|
command.setMessageId(message.getMessageId().toString());
|
||||||
|
processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
|
||||||
|
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
|
||||||
|
command.setDestination(dest);
|
||||||
|
command.setMessageId(ack.getLastMessageId().toString());
|
||||||
|
processRemove(command, ack.getTransactionId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||||
|
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
|
||||||
|
command.setDestination(dest);
|
||||||
|
process(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Message getMessage(MessageId identity) throws IOException {
|
||||||
|
final String key = identity.toString();
|
||||||
|
|
||||||
|
// Hopefully one day the page file supports concurrent read operations... but for now we must
|
||||||
|
// externally synchronize...
|
||||||
|
ByteSequence data;
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
|
||||||
|
public ByteSequence execute(Transaction tx) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
Long sequence = sd.messageIdIndex.get(tx, key);
|
||||||
|
if( sequence ==null ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return sd.orderIndex.get(tx, sequence).data;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if( data == null ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Message msg = (Message)wireFormat.unmarshal( data );
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMessageCount() throws IOException {
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
|
||||||
|
public Integer execute(Transaction tx) throws IOException {
|
||||||
|
// Iterate through all index entries to get a count of messages in the destination.
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
int rc=0;
|
||||||
|
for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
|
||||||
|
iterator.next();
|
||||||
|
rc++;
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recover(final MessageRecoveryListener listener) throws Exception {
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<Exception>(){
|
||||||
|
public void execute(Transaction tx) throws Exception {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
|
||||||
|
Entry<Long, MessageRecord> entry = iterator.next();
|
||||||
|
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long cursorPos=0;
|
||||||
|
|
||||||
|
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<Exception>(){
|
||||||
|
public void execute(Transaction tx) throws Exception {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
Entry<Long, MessageRecord> entry=null;
|
||||||
|
int counter = 0;
|
||||||
|
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
|
||||||
|
entry = iterator.next();
|
||||||
|
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
|
||||||
|
counter++;
|
||||||
|
if( counter >= maxReturned ) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if( entry!=null ) {
|
||||||
|
cursorPos = entry.getKey()+1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetBatching() {
|
||||||
|
cursorPos=0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setBatch(MessageId identity) throws IOException {
|
||||||
|
final String key = identity.toString();
|
||||||
|
|
||||||
|
// Hopefully one day the page file supports concurrent read operations... but for now we must
|
||||||
|
// externally synchronize...
|
||||||
|
Long location;
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
|
||||||
|
public Long execute(Transaction tx) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
return sd.messageIdIndex.get(tx, key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if( location!=null ) {
|
||||||
|
cursorPos=location+1;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMemoryUsage(MemoryUsage memoeyUSage) {
|
||||||
|
}
|
||||||
|
public void start() throws Exception {
|
||||||
|
}
|
||||||
|
public void stop() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
|
||||||
|
public KahaDBTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
super(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
|
||||||
|
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
|
||||||
|
command.setDestination(dest);
|
||||||
|
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
|
||||||
|
command.setMessageId(messageId.toString());
|
||||||
|
// We are not passed a transaction info.. so we can't participate in a transaction.
|
||||||
|
// Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack
|
||||||
|
// to pass back to the XA recover method.
|
||||||
|
// command.setTransactionInfo();
|
||||||
|
processRemove(command, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
|
||||||
|
String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
|
||||||
|
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
|
||||||
|
command.setDestination(dest);
|
||||||
|
command.setSubscriptionKey(subscriptionKey);
|
||||||
|
command.setRetroactive(retroactive);
|
||||||
|
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
|
||||||
|
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
|
||||||
|
process(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
|
||||||
|
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
|
||||||
|
command.setDestination(dest);
|
||||||
|
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
|
||||||
|
process(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
|
||||||
|
|
||||||
|
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>(){
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
|
||||||
|
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
|
||||||
|
SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
|
||||||
|
subscriptions.add(info);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
|
||||||
|
subscriptions.toArray(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
|
||||||
|
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
|
||||||
|
public SubscriptionInfo execute(Transaction tx) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
|
||||||
|
if( command ==null ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
|
||||||
|
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
|
||||||
|
public Integer execute(Transaction tx) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||||
|
if ( cursorPos==null ) {
|
||||||
|
// The subscription might not exist.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
cursorPos += 1;
|
||||||
|
|
||||||
|
int counter = 0;
|
||||||
|
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
|
||||||
|
iterator.next();
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
|
||||||
|
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<Exception>(){
|
||||||
|
public void execute(Transaction tx) throws Exception {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||||
|
cursorPos += 1;
|
||||||
|
|
||||||
|
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
|
||||||
|
Entry<Long, MessageRecord> entry = iterator.next();
|
||||||
|
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
|
||||||
|
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<Exception>(){
|
||||||
|
public void execute(Transaction tx) throws Exception {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
|
||||||
|
if( cursorPos == null ) {
|
||||||
|
cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||||
|
cursorPos += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Entry<Long, MessageRecord> entry=null;
|
||||||
|
int counter = 0;
|
||||||
|
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
|
||||||
|
entry = iterator.next();
|
||||||
|
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
|
||||||
|
counter++;
|
||||||
|
if( counter >= maxReturned ) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if( entry!=null ) {
|
||||||
|
sd.subscriptionCursors.put(subscriptionKey, cursorPos+1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetBatching(String clientId, String subscriptionName) {
|
||||||
|
try {
|
||||||
|
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>(){
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
sd.subscriptionCursors.remove(subscriptionKey);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String subscriptionKey(String clientId, String subscriptionName){
|
||||||
|
return clientId+":"+subscriptionName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
|
||||||
|
return new KahaDBMessageStore(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
|
||||||
|
return new KahaDBTopicMessageStore(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination.
|
||||||
|
* This method does not stop the message store (it might not be cached).
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
* This method does not stop the message store (it might not be cached).
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteAllMessages() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Set<ActiveMQDestination> getDestinations() {
|
||||||
|
try {
|
||||||
|
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>(){
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
|
||||||
|
Entry<String, StoredDestination> entry = iterator.next();
|
||||||
|
rc.add(convert(entry.getKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastMessageBrokerSequenceId() throws IOException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long size() {
|
||||||
|
if ( !started.get() ) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return pageFile.getDiskSize();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void beginTransaction(ConnectionContext context) throws IOException {
|
||||||
|
throw new IOException("Not yet implemented.");
|
||||||
|
}
|
||||||
|
public void commitTransaction(ConnectionContext context) throws IOException {
|
||||||
|
throw new IOException("Not yet implemented.");
|
||||||
|
}
|
||||||
|
public void rollbackTransaction(ConnectionContext context) throws IOException {
|
||||||
|
throw new IOException("Not yet implemented.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkpoint(boolean sync) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////
|
||||||
|
// Internal conversion methods.
|
||||||
|
///////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
KahaLocation convert(Location location) {
|
||||||
|
KahaLocation rc = new KahaLocation();
|
||||||
|
rc.setLogId(location.getDataFileId());
|
||||||
|
rc.setOffset(location.getOffset());
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
KahaDestination convert(ActiveMQDestination dest) {
|
||||||
|
KahaDestination rc = new KahaDestination();
|
||||||
|
rc.setName(dest.getPhysicalName());
|
||||||
|
switch( dest.getDestinationType() ) {
|
||||||
|
case ActiveMQDestination.QUEUE_TYPE:
|
||||||
|
rc.setType(DestinationType.QUEUE);
|
||||||
|
return rc;
|
||||||
|
case ActiveMQDestination.TOPIC_TYPE:
|
||||||
|
rc.setType(DestinationType.TOPIC);
|
||||||
|
return rc;
|
||||||
|
case ActiveMQDestination.TEMP_QUEUE_TYPE:
|
||||||
|
rc.setType(DestinationType.TEMP_QUEUE);
|
||||||
|
return rc;
|
||||||
|
case ActiveMQDestination.TEMP_TOPIC_TYPE:
|
||||||
|
rc.setType(DestinationType.TEMP_TOPIC);
|
||||||
|
return rc;
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ActiveMQDestination convert(String dest) {
|
||||||
|
int p = dest.indexOf(":");
|
||||||
|
if( p<0 ) {
|
||||||
|
throw new IllegalArgumentException("Not in the valid destination format");
|
||||||
|
}
|
||||||
|
int type = Integer.parseInt(dest.substring(0, p));
|
||||||
|
String name = dest.substring(p+1);
|
||||||
|
|
||||||
|
switch( KahaDestination.DestinationType.valueOf(type) ) {
|
||||||
|
case QUEUE:
|
||||||
|
return new ActiveMQQueue(name);
|
||||||
|
case TOPIC:
|
||||||
|
return new ActiveMQTopic(name);
|
||||||
|
case TEMP_QUEUE:
|
||||||
|
return new ActiveMQTempQueue(name);
|
||||||
|
case TEMP_TOPIC:
|
||||||
|
return new ActiveMQTempTopic(name);
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Not in the valid destination format");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,703 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.SubscriptionInfo;
|
||||||
|
import org.apache.activemq.command.TransactionId;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaDestination;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.kahadb.index.BTreeIndex;
|
||||||
|
import org.apache.kahadb.page.PageFile;
|
||||||
|
import org.apache.kahadb.page.Transaction;
|
||||||
|
import org.apache.kahadb.util.LongMarshaller;
|
||||||
|
import org.apache.kahadb.util.Marshaller;
|
||||||
|
import org.apache.kahadb.util.StringMarshaller;
|
||||||
|
|
||||||
|
public class TempMessageDatabase {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TempMessageDatabase.class);
|
||||||
|
|
||||||
|
public static final int CLOSED_STATE = 1;
|
||||||
|
public static final int OPEN_STATE = 2;
|
||||||
|
|
||||||
|
protected BTreeIndex<String, StoredDestination> destinations;
|
||||||
|
protected PageFile pageFile;
|
||||||
|
|
||||||
|
protected File directory;
|
||||||
|
|
||||||
|
boolean enableIndexWriteAsync = true;
|
||||||
|
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
|
||||||
|
|
||||||
|
protected AtomicBoolean started = new AtomicBoolean();
|
||||||
|
protected AtomicBoolean opened = new AtomicBoolean();
|
||||||
|
|
||||||
|
public TempMessageDatabase() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
if (started.compareAndSet(false, true)) {
|
||||||
|
load();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws Exception {
|
||||||
|
if (started.compareAndSet(true, false)) {
|
||||||
|
unload();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadPageFile() throws IOException {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
final PageFile pageFile = getPageFile();
|
||||||
|
pageFile.load();
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
|
||||||
|
destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
destinations.setValueMarshaller(new StoredDestinationMarshaller());
|
||||||
|
destinations.load(tx);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
pageFile.flush();
|
||||||
|
storedDestinations.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void open() throws IOException {
|
||||||
|
if( opened.compareAndSet(false, true) ) {
|
||||||
|
loadPageFile();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void load() throws IOException {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
open();
|
||||||
|
pageFile.unload();
|
||||||
|
pageFile.delete();
|
||||||
|
loadPageFile();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void close() throws IOException, InterruptedException {
|
||||||
|
if( opened.compareAndSet(true, false)) {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
pageFile.unload();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unload() throws IOException, InterruptedException {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
if( pageFile.isLoaded() ) {
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
|
||||||
|
if (txid!=null) {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
ArrayList<Operation> inflightTx = getInflightTx(txid);
|
||||||
|
inflightTx.add(new AddOpperation(command, data));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
upadateIndex(tx, command, data);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
|
||||||
|
if (txid!=null) {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
ArrayList<Operation> inflightTx = getInflightTx(txid);
|
||||||
|
inflightTx.add(new RemoveOpperation(command));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
updateIndex(tx, command);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void process(final KahaRemoveDestinationCommand command) throws IOException {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
updateIndex(tx, command);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void process(final KahaSubscriptionCommand command) throws IOException {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
updateIndex(tx, command);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processCommit(TransactionId key) throws IOException {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
|
||||||
|
if (inflightTx == null) {
|
||||||
|
inflightTx = preparedTransactions.remove(key);
|
||||||
|
}
|
||||||
|
if (inflightTx == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ArrayList<Operation> messagingTx = inflightTx;
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
for (Operation op : messagingTx) {
|
||||||
|
op.execute(tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processPrepare(TransactionId key) {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
ArrayList<Operation> tx = inflightTransactions.remove(key);
|
||||||
|
if (tx != null) {
|
||||||
|
preparedTransactions.put(key, tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processRollback(TransactionId key) {
|
||||||
|
synchronized (indexMutex) {
|
||||||
|
ArrayList<Operation> tx = inflightTransactions.remove(key);
|
||||||
|
if (tx == null) {
|
||||||
|
preparedTransactions.remove(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
// These methods do the actual index updates.
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
protected final Object indexMutex = new Object();
|
||||||
|
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
|
||||||
|
|
||||||
|
private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
||||||
|
|
||||||
|
// Skip adding the message to the index if this is a topic and there are
|
||||||
|
// no subscriptions.
|
||||||
|
if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the message.
|
||||||
|
long id = sd.nextMessageId++;
|
||||||
|
Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
||||||
|
if( previous == null ) {
|
||||||
|
sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
|
||||||
|
} else {
|
||||||
|
// restore the previous value.. Looks like this was a redo of a previously
|
||||||
|
// added message. We don't want to assing it a new id as the other indexes would
|
||||||
|
// be wrong..
|
||||||
|
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
||||||
|
if (!command.hasSubscriptionKey()) {
|
||||||
|
|
||||||
|
// In the queue case we just remove the message from the index..
|
||||||
|
Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
|
||||||
|
if (sequenceId != null) {
|
||||||
|
sd.orderIndex.remove(tx, sequenceId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// In the topic case we need remove the message once it's been acked
|
||||||
|
// by all the subs
|
||||||
|
Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
|
||||||
|
|
||||||
|
// Make sure it's a valid message id...
|
||||||
|
if (sequence != null) {
|
||||||
|
String subscriptionKey = command.getSubscriptionKey();
|
||||||
|
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
|
||||||
|
|
||||||
|
// The following method handles deleting un-referenced messages.
|
||||||
|
removeAckByteSequence(tx, sd, subscriptionKey, prev);
|
||||||
|
|
||||||
|
// Add it to the new location set.
|
||||||
|
addAckByteSequence(sd, sequence, subscriptionKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
||||||
|
sd.orderIndex.clear(tx);
|
||||||
|
sd.orderIndex.unload(tx);
|
||||||
|
tx.free(sd.orderIndex.getPageId());
|
||||||
|
|
||||||
|
sd.messageIdIndex.clear(tx);
|
||||||
|
sd.messageIdIndex.unload(tx);
|
||||||
|
tx.free(sd.messageIdIndex.getPageId());
|
||||||
|
|
||||||
|
if (sd.subscriptions != null) {
|
||||||
|
sd.subscriptions.clear(tx);
|
||||||
|
sd.subscriptions.unload(tx);
|
||||||
|
tx.free(sd.subscriptions.getPageId());
|
||||||
|
|
||||||
|
sd.subscriptionAcks.clear(tx);
|
||||||
|
sd.subscriptionAcks.unload(tx);
|
||||||
|
tx.free(sd.subscriptionAcks.getPageId());
|
||||||
|
}
|
||||||
|
|
||||||
|
String key = key(command.getDestination());
|
||||||
|
storedDestinations.remove(key);
|
||||||
|
destinations.remove(tx, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
|
||||||
|
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
||||||
|
|
||||||
|
// If set then we are creating it.. otherwise we are destroying the sub
|
||||||
|
if (command.hasSubscriptionInfo()) {
|
||||||
|
String subscriptionKey = command.getSubscriptionKey();
|
||||||
|
sd.subscriptions.put(tx, subscriptionKey, command);
|
||||||
|
long ackByteSequence=-1;
|
||||||
|
if (!command.getRetroactive()) {
|
||||||
|
ackByteSequence = sd.nextMessageId-1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
|
||||||
|
addAckByteSequence(sd, ackByteSequence, subscriptionKey);
|
||||||
|
} else {
|
||||||
|
// delete the sub...
|
||||||
|
String subscriptionKey = command.getSubscriptionKey();
|
||||||
|
sd.subscriptions.remove(tx, subscriptionKey);
|
||||||
|
Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
|
||||||
|
if( prev!=null ) {
|
||||||
|
removeAckByteSequence(tx, sd, subscriptionKey, prev);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public HashSet<Integer> getJournalFilesBeingReplicated() {
|
||||||
|
return journalFilesBeingReplicated;
|
||||||
|
}
|
||||||
|
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
// StoredDestination related implementation methods.
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
|
||||||
|
|
||||||
|
class StoredSubscription {
|
||||||
|
SubscriptionInfo subscriptionInfo;
|
||||||
|
String lastAckId;
|
||||||
|
ByteSequence lastAckByteSequence;
|
||||||
|
ByteSequence cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MessageRecord {
|
||||||
|
final String messageId;
|
||||||
|
final ByteSequence data;
|
||||||
|
|
||||||
|
public MessageRecord(String messageId, ByteSequence location) {
|
||||||
|
this.messageId=messageId;
|
||||||
|
this.data=location;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "["+messageId+","+data+"]";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static protected class MessageKeysMarshaller implements Marshaller<MessageRecord> {
|
||||||
|
static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
|
||||||
|
|
||||||
|
public Class<MessageRecord> getType() {
|
||||||
|
return MessageRecord.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageRecord readPayload(DataInput dataIn) throws IOException {
|
||||||
|
return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
|
||||||
|
dataOut.writeUTF(object.messageId);
|
||||||
|
ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class StoredDestination {
|
||||||
|
long nextMessageId;
|
||||||
|
BTreeIndex<Long, MessageRecord> orderIndex;
|
||||||
|
BTreeIndex<String, Long> messageIdIndex;
|
||||||
|
|
||||||
|
// These bits are only set for Topics
|
||||||
|
BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
|
||||||
|
BTreeIndex<String, Long> subscriptionAcks;
|
||||||
|
HashMap<String, Long> subscriptionCursors;
|
||||||
|
TreeMap<Long, HashSet<String>> ackPositions;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class StoredDestinationMarshaller implements Marshaller<StoredDestination> {
|
||||||
|
public Class<StoredDestination> getType() {
|
||||||
|
return StoredDestination.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StoredDestination readPayload(DataInput dataIn) throws IOException {
|
||||||
|
StoredDestination value = new StoredDestination();
|
||||||
|
value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong());
|
||||||
|
value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
|
||||||
|
|
||||||
|
if (dataIn.readBoolean()) {
|
||||||
|
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
|
||||||
|
value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
|
||||||
|
dataOut.writeLong(value.orderIndex.getPageId());
|
||||||
|
dataOut.writeLong(value.messageIdIndex.getPageId());
|
||||||
|
if (value.subscriptions != null) {
|
||||||
|
dataOut.writeBoolean(true);
|
||||||
|
dataOut.writeLong(value.subscriptions.getPageId());
|
||||||
|
dataOut.writeLong(value.subscriptionAcks.getPageId());
|
||||||
|
} else {
|
||||||
|
dataOut.writeBoolean(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ByteSequenceMarshaller implements Marshaller<ByteSequence> {
|
||||||
|
final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();
|
||||||
|
|
||||||
|
public Class<ByteSequence> getType() {
|
||||||
|
return ByteSequence.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ByteSequence readPayload(DataInput dataIn) throws IOException {
|
||||||
|
byte data[] = new byte[dataIn.readInt()];
|
||||||
|
dataIn.readFully(data);
|
||||||
|
return new ByteSequence(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
|
||||||
|
dataOut.writeInt(object.getLength());
|
||||||
|
dataOut.write(object.getData(), object.getOffset(), object.getLength());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class KahaSubscriptionCommandMarshaller implements Marshaller<KahaSubscriptionCommand> {
|
||||||
|
final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
|
||||||
|
|
||||||
|
public Class<KahaSubscriptionCommand> getType() {
|
||||||
|
return KahaSubscriptionCommand.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
|
||||||
|
KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
|
||||||
|
rc.mergeFramed((InputStream)dataIn);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
|
||||||
|
object.writeFramed((OutputStream)dataOut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
|
||||||
|
String key = key(destination);
|
||||||
|
StoredDestination rc = storedDestinations.get(key);
|
||||||
|
if (rc == null) {
|
||||||
|
boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
|
||||||
|
rc = loadStoredDestination(tx, key, topic);
|
||||||
|
// Cache it. We may want to remove/unload destinations from the
|
||||||
|
// cache that are not used for a while
|
||||||
|
// to reduce memory usage.
|
||||||
|
storedDestinations.put(key, rc);
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param tx
|
||||||
|
* @param key
|
||||||
|
* @param topic
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
|
||||||
|
// Try to load the existing indexes..
|
||||||
|
StoredDestination rc = destinations.get(tx, key);
|
||||||
|
if (rc == null) {
|
||||||
|
// Brand new destination.. allocate indexes for it.
|
||||||
|
rc = new StoredDestination();
|
||||||
|
rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate());
|
||||||
|
rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
|
||||||
|
|
||||||
|
if (topic) {
|
||||||
|
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
|
||||||
|
rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
|
||||||
|
}
|
||||||
|
destinations.put(tx, key, rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure the marshalers and load.
|
||||||
|
rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||||
|
rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
||||||
|
rc.orderIndex.load(tx);
|
||||||
|
|
||||||
|
// Figure out the next key using the last entry in the destination.
|
||||||
|
Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
|
||||||
|
if( lastEntry!=null ) {
|
||||||
|
rc.nextMessageId = lastEntry.getKey()+1;
|
||||||
|
}
|
||||||
|
|
||||||
|
rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
|
||||||
|
rc.messageIdIndex.load(tx);
|
||||||
|
|
||||||
|
// If it was a topic...
|
||||||
|
if (topic) {
|
||||||
|
|
||||||
|
rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
|
||||||
|
rc.subscriptions.load(tx);
|
||||||
|
|
||||||
|
rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
|
||||||
|
rc.subscriptionAcks.load(tx);
|
||||||
|
|
||||||
|
rc.ackPositions = new TreeMap<Long, HashSet<String>>();
|
||||||
|
rc.subscriptionCursors = new HashMap<String, Long>();
|
||||||
|
|
||||||
|
for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
|
||||||
|
Entry<String, Long> entry = iterator.next();
|
||||||
|
addAckByteSequence(rc, entry.getValue(), entry.getKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param sd
|
||||||
|
* @param messageSequence
|
||||||
|
* @param subscriptionKey
|
||||||
|
*/
|
||||||
|
private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
|
||||||
|
HashSet<String> hs = sd.ackPositions.get(messageSequence);
|
||||||
|
if (hs == null) {
|
||||||
|
hs = new HashSet<String>();
|
||||||
|
sd.ackPositions.put(messageSequence, hs);
|
||||||
|
}
|
||||||
|
hs.add(subscriptionKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param tx
|
||||||
|
* @param sd
|
||||||
|
* @param subscriptionKey
|
||||||
|
* @param sequenceId
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
|
||||||
|
// Remove the sub from the previous location set..
|
||||||
|
if (sequenceId != null) {
|
||||||
|
HashSet<String> hs = sd.ackPositions.get(sequenceId);
|
||||||
|
if (hs != null) {
|
||||||
|
hs.remove(subscriptionKey);
|
||||||
|
if (hs.isEmpty()) {
|
||||||
|
HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
|
||||||
|
sd.ackPositions.remove(sequenceId);
|
||||||
|
|
||||||
|
// Did we just empty out the first set in the
|
||||||
|
// ordered list of ack locations? Then it's time to
|
||||||
|
// delete some messages.
|
||||||
|
if (hs == firstSet) {
|
||||||
|
|
||||||
|
// Find all the entries that need to get deleted.
|
||||||
|
ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>();
|
||||||
|
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
|
||||||
|
Entry<Long, MessageRecord> entry = iterator.next();
|
||||||
|
if (entry.getKey().compareTo(sequenceId) <= 0) {
|
||||||
|
// We don't do the actually delete while we are
|
||||||
|
// iterating the BTree since
|
||||||
|
// iterating would fail.
|
||||||
|
deletes.add(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the actual deletes.
|
||||||
|
for (Entry<Long, MessageRecord> entry : deletes) {
|
||||||
|
sd.messageIdIndex.remove(tx,entry.getValue().messageId);
|
||||||
|
sd.orderIndex.remove(tx,entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String key(KahaDestination destination) {
|
||||||
|
return destination.getType().getNumber() + ":" + destination.getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
// Transaction related implementation methods.
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
|
||||||
|
protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
|
||||||
|
|
||||||
|
private ArrayList<Operation> getInflightTx(TransactionId key) {
|
||||||
|
ArrayList<Operation> tx = inflightTransactions.get(key);
|
||||||
|
if (tx == null) {
|
||||||
|
tx = new ArrayList<Operation>();
|
||||||
|
inflightTransactions.put(key, tx);
|
||||||
|
}
|
||||||
|
return tx;
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract class Operation {
|
||||||
|
abstract public void execute(Transaction tx) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
class AddOpperation extends Operation {
|
||||||
|
final KahaAddMessageCommand command;
|
||||||
|
private final ByteSequence data;
|
||||||
|
|
||||||
|
public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
|
||||||
|
this.command = command;
|
||||||
|
this.data = location;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
upadateIndex(tx, command, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public KahaAddMessageCommand getCommand() {
|
||||||
|
return command;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoveOpperation extends Operation {
|
||||||
|
final KahaRemoveMessageCommand command;
|
||||||
|
|
||||||
|
public RemoveOpperation(KahaRemoveMessageCommand command) {
|
||||||
|
this.command = command;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(Transaction tx) throws IOException {
|
||||||
|
updateIndex(tx, command);
|
||||||
|
}
|
||||||
|
|
||||||
|
public KahaRemoveMessageCommand getCommand() {
|
||||||
|
return command;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
// Initialization related implementation methods.
|
||||||
|
// /////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private PageFile createPageFile() {
|
||||||
|
PageFile index = new PageFile(directory, "temp-db");
|
||||||
|
index.setEnableWriteThread(isEnableIndexWriteAsync());
|
||||||
|
index.setWriteBatchSize(getIndexWriteBatchSize());
|
||||||
|
index.setEnableDiskSyncs(false);
|
||||||
|
index.setEnableRecoveryFile(false);
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
public File getDirectory() {
|
||||||
|
return directory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDirectory(File directory) {
|
||||||
|
this.directory = directory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
|
||||||
|
this.setIndexWriteBatchSize = setIndexWriteBatchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getIndexWriteBatchSize() {
|
||||||
|
return setIndexWriteBatchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
|
||||||
|
this.enableIndexWriteAsync = enableIndexWriteAsync;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isEnableIndexWriteAsync() {
|
||||||
|
return enableIndexWriteAsync;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PageFile getPageFile() {
|
||||||
|
if (pageFile == null) {
|
||||||
|
pageFile = createPageFile();
|
||||||
|
}
|
||||||
|
return pageFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import junit.framework.Test;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.BrokerTest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Once the wire format is completed we can test against real persistence storage.
|
||||||
|
*
|
||||||
|
* @version $Revision: 712224 $
|
||||||
|
*/
|
||||||
|
public class TempKahaDBStoreBrokerTest extends BrokerTest {
|
||||||
|
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
BrokerService broker = new BrokerService();
|
||||||
|
KahaDBStore kaha = new KahaDBStore();
|
||||||
|
kaha.setDirectory(new File("target/activemq-data/kahadb"));
|
||||||
|
kaha.deleteAllMessages();
|
||||||
|
broker.setPersistenceAdapter(kaha);
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createRestartedBroker() throws Exception {
|
||||||
|
BrokerService broker = new BrokerService();
|
||||||
|
TempKahaDBStore kaha = new TempKahaDBStore();
|
||||||
|
kaha.setDirectory(new File("target/activemq-data/kahadb"));
|
||||||
|
broker.setPersistenceAdapter(kaha);
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static Test suite() {
|
||||||
|
return suite(TempKahaDBStoreBrokerTest.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
junit.textui.TestRunner.run(suite());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb.perf;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.perf.SimpleQueueTest;
|
||||||
|
import org.apache.activemq.store.kahadb.TempKahaDBStore;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 712224 $
|
||||||
|
*/
|
||||||
|
public class TempKahaStoreQueueTest extends SimpleQueueTest {
|
||||||
|
|
||||||
|
protected void configureBroker(BrokerService answer,String uri) throws Exception {
|
||||||
|
File dataFileDir = new File("target/test-amq-data/perfTest/temp-amqdb");
|
||||||
|
dataFileDir.mkdirs();
|
||||||
|
answer.setDeleteAllMessagesOnStartup(true);
|
||||||
|
|
||||||
|
TempKahaDBStore adaptor = new TempKahaDBStore();
|
||||||
|
adaptor.setDirectory(dataFileDir);
|
||||||
|
|
||||||
|
|
||||||
|
answer.setDataDirectoryFile(dataFileDir);
|
||||||
|
answer.setPersistenceAdapter(adaptor);
|
||||||
|
answer.addConnector(uri);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue