- Big refactor of the QuickJournal:

- Move it to it's own package org.apache.activemq.store.quick
  - Brought in all the latest JournalPersistenceAdaptor enhancements
  - It now uses the AsyncDataManager as the Journal implemenation which has better read performance
  - Instead of forcing all PersistenceAdaptors to support external references, we now move all the message reference methods to a new set of interface class (MesageReferenceAdaptor)
  - Enhanced a few Kaha container classes so that they take advantage of Generics
  - Added a Kaha based MesageReferenceAdaptor impementation
  - Strategy for deleting old journal log files is now in place so that disk space can be reclaimed.


git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@492380 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-01-04 01:48:20 +00:00
parent 0afb7f934f
commit 481fc1ee3a
20 changed files with 2447 additions and 14 deletions

View File

@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.ByteSequence;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -89,12 +90,14 @@ public final class AsyncDataManager {
protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
private Runnable cleanupTask;
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if( started ) {
return;
}
started=true;
directory.mkdirs();
@ -158,6 +161,12 @@ public final class AsyncDataManager {
}
storeState(false);
cleanupTask = new Runnable(){
public void run() {
cleanup();
}};
Scheduler.executePeriodically(cleanupTask, 1000*30);
}
private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
@ -257,14 +266,24 @@ public final class AsyncDataManager {
}
public synchronized void close() throws IOException{
if( !started ) {
return;
}
Scheduler.cancel(cleanupTask);
accessorPool.close();
storeState(false);
appender.close();
fileMap.clear();
controlFile.unlock();
controlFile.dispose();
started=false;
}
private synchronized void cleanup() {
if( accessorPool!=null ) {
accessorPool.disposeUnused();
}
}
public synchronized boolean delete() throws IOException{
// Close all open file handles...
@ -362,6 +381,7 @@ public final class AsyncDataManager {
private void removeDataFile(DataFile dataFile) throws IOException{
fileMap.remove(dataFile.getDataFileId());
dataFile.unlink();
accessorPool.disposeDataFileAccessors(dataFile);
boolean result=dataFile.delete();
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
}

View File

@ -102,6 +102,21 @@ public class DataFileAccessorPool {
}
}
synchronized void disposeDataFileAccessors(DataFile dataFile) throws IOException {
if( closed ) {
throw new IOException("Closed.");
}
Pool pool = pools.get(dataFile.getDataFileId());
if( pool != null ) {
if( !pool.isUsed() ) {
pool.dispose();
pools.remove(dataFile.getDataFileId());
} else {
throw new IOException("The data file is still in use: "+dataFile);
}
}
}
synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
if( closed ) {
throw new IOException("Closed.");

View File

@ -0,0 +1,70 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.MessageId;
/**
* Represents a message store which is used by the persistent
* implementations
*
* @version $Revision: 1.5 $
*/
public interface ReferenceStore extends MessageStore {
public class ReferenceData {
long expiration;
int fileId;
int offset;
public long getExpiration() {
return expiration;
}
public void setExpiration(long expiration) {
this.expiration = expiration;
}
public int getFileId() {
return fileId;
}
public void setFileId(int file) {
this.fileId = file;
}
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
}
/**
* Adds a message reference to the message store
*/
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
/**
* Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
* in the missing key if its easy to do so.
*/
public ReferenceData getMessageReference(MessageId identity) throws IOException;
}

View File

@ -0,0 +1,45 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.io.IOException;
import java.util.Set;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
/**
* Adapter to the actual persistence mechanism used with ActiveMQ
*
* @version $Revision: 1.3 $
*/
public interface ReferenceStoreAdapter extends PersistenceAdapter {
/**
* Factory method to create a new queue message store with the given destination name
*/
public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException;
/**
* Factory method to create a new topic message store with the given destination name
*/
public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException;
public Set<Integer> getReferenceFileIdsInUse() throws IOException;
}

View File

@ -0,0 +1,137 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
/**
* A MessageStore for durable topic subscriptions
*
* @version $Revision: 1.4 $
*/
public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
/**
* Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
* messages from the last checkpoint
*
* @param context
* @param clientId
* @param subscriptionName
* @param messageId
* @param subscriptionPersistentId
* @throws IOException
*/
public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
throws IOException;
/**
* @param clientId
* @param subscriptionName
* @param sub
* @throws IOException
* @throws JMSException
*/
public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
/**
* For the new subscription find the last acknowledged message ID and then find any new messages since then and
* dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
* then went down before acknowledging any messages, we need to know the correct point from which to recover from.
*
* @param clientId
* @param subscriptionName
* @param listener
* @param subscription
*
* @throws Exception
*/
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception;
/**
* For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
* messageId <p/>
*
* @param clientId
* @param subscriptionName
* @param maxReturned
* @param listener
*
* @throws Exception
*/
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception;
/**
* A hint to the Store to reset any batching state for a durable subsriber
* @param clientId
* @param subscriptionName
*
*/
public void resetBatching(String clientId,String subscriptionName);
/**
* Get the number of messages ready to deliver from the store to a durable subscriber
* @param clientId
* @param subscriberName
* @return the outstanding message count
* @throws IOException
*/
public int getMessageCount(String clientId,String subscriberName) throws IOException;
/**
* Finds the subscriber entry for the given consumer info
*
* @param clientId
* @param subscriptionName
* @return the SubscriptionInfo
* @throws IOException
*/
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
/**
* Lists all the durable subscirptions for a given destination.
*
* @return an array SubscriptionInfos
* @throws IOException
*/
public SubscriptionInfo[] getAllSubscriptions() throws IOException;
/**
* Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
* is false, then the last message sent to the topic should be set as the last message acknowledged by they new
* subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
* message so that on recovery, all message recorded for the topic get replayed.
*
* @param clientId
* @param subscriptionName
* @param selector
* @param retroactive
* @throws IOException
*
*/
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException;
}

View File

@ -0,0 +1,40 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
/**
* Marshall an Integer
* @version $Revision: 1.10 $
*/
public class IntegerMarshaller implements Marshaller<Integer> {
public void writePayload(Integer object,DataOutput dataOut) throws IOException{
dataOut.writeInt(object.intValue());
}
public Integer readPayload(DataInput dataIn) throws IOException{
return dataIn.readInt();
}
}

View File

@ -114,8 +114,8 @@ public class KahaMessageStore implements MessageStore, UsageListener{
messageContainer.remove(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
Message msg=(Message)messageContainer.get(entry);
if(msg.getMessageId().equals(msgId)){
MessageId id=getMessageId(messageContainer.get(entry));
if(id.equals(msgId)){
messageContainer.remove(entry);
break;
}

View File

@ -176,7 +176,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
Store store=getStore();
MapContainer<String, Object> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new StringMarshaller());
container.setValueMarshaller(createMessageMarshaller());
container.setValueMarshaller(new CommandMarshaller(wireFormat));
container.load();
return container;
}

View File

@ -0,0 +1,100 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
public class KahaReferenceStore extends KahaMessageStore implements ReferenceStore {
private final MapContainer<Integer, Integer> fileReferences;
public KahaReferenceStore(ListContainer container, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
super(container, destination, maximumCacheSize);
this.fileReferences = fileReferences;
}
@Override
protected MessageId getMessageId(Object object) {
return new MessageId(((ReferenceRecord)object).messageId);
}
@Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
public synchronized Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
ReferenceRecord record = (ReferenceRecord) msg;
listener.recoverMessageReference(new MessageId(record.messageId));
}
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
StoreEntry item=messageContainer.placeLast(record);
cache.put(messageId,item);
}
public ReferenceData getMessageReference(MessageId identity) throws IOException {
ReferenceRecord result=null;
StoreEntry entry=cache.get(identity);
if(entry!=null){
entry = messageContainer.refresh(entry);
result = (ReferenceRecord)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
if(msg.messageId.equals(identity)){
result=msg;
cache.put(identity,entry);
break;
}
}
}
if( result == null )
return null;
return result.data;
}
public void addReferenceFileIdsInUse(Set<Integer> rc) {
for (StoreEntry entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
rc.add(msg.data.getFileId());
}
}
}

View File

@ -0,0 +1,153 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
private MapContainer<Integer, Integer> fileReferences;
public KahaReferenceStoreAdapter(File dir) throws IOException {
super(dir);
}
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
throw new RuntimeException("Use createQueueReferenceStore instead");
}
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
throw new RuntimeException("Use createTopicReferenceStore instead");
}
@Override
public void start() throws Exception {
super.start();
Store store=getStore();
fileReferences=store.getMapContainer("file-references");
fileReferences.setKeyMarshaller(new IntegerMarshaller());
fileReferences.setValueMarshaller(new IntegerMarshaller());
fileReferences.load();
}
public static class ReferenceRecord {
public String messageId;
public ReferenceData data;
public ReferenceRecord() {
}
public ReferenceRecord(String messageId, ReferenceData data) {
this.messageId = messageId;
this.data = data;
}
}
protected Marshaller<Object> createMessageMarshaller() {
return new Marshaller<Object>() {
public void writePayload(Object object,DataOutput dataOut) throws IOException{
ReferenceRecord rr = (ReferenceRecord) object;
dataOut.writeUTF(rr.messageId);
dataOut.writeInt(rr.data.getFileId());
dataOut.writeInt(rr.data.getOffset());
dataOut.writeLong(rr.data.getExpiration());
}
public Object readPayload(DataInput dataIn) throws IOException{
ReferenceRecord rr = new ReferenceRecord();
rr.messageId = dataIn.readUTF();
rr.data = new ReferenceData();
rr.data.setFileId(dataIn.readInt());
rr.data.setOffset(dataIn.readInt());
rr.data.setExpiration(dataIn.readLong());
return rr;
}
};
}
public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
ReferenceStore rc=(ReferenceStore)queues.get(destination);
if(rc==null){
rc=new KahaReferenceStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize, fileReferences);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
// }
queues.put(destination,rc);
}
return rc;
}
public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
if(rc==null){
Store store=getStore();
ListContainer messageContainer=getListContainer(destination,"topic-data");
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize, fileReferences);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
// }
topics.put(destination,rc);
}
return rc;
}
public Set<Integer> getReferenceFileIdsInUse() throws IOException {
Set<Integer> rc = new HashSet<Integer>();
Set<ActiveMQDestination> destinations = getDestinations();
for (ActiveMQDestination destination : destinations) {
if( destination.isQueue() ) {
KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
store.addReferenceFileIdsInUse(rc);
} else {
KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
store.addReferenceFileIdsInUse(rc);
}
}
return rc;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
/**
* @version $Revision: 1.5 $
@ -54,6 +55,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
}
}
@Override
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){

View File

@ -0,0 +1,121 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
public class KahaTopicReferenceStore extends KahaTopicMessageStore implements TopicReferenceStore {
private final MapContainer<Integer, Integer> fileReferences;
public KahaTopicReferenceStore(Store store, ListContainer messageContainer, ListContainer ackContainer, MapContainer subsContainer, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
super(store, messageContainer, ackContainer, subsContainer, destination, maximumCacheSize);
this.fileReferences = fileReferences;
}
@Override
protected MessageId getMessageId(Object object) {
return new MessageId(((ReferenceRecord)object).messageId);
}
@Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
public synchronized Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
ReferenceRecord record = (ReferenceRecord) msg;
listener.recoverMessageReference(new MessageId(record.messageId));
}
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
StoreEntry messageEntry=messageContainer.placeLast(record);
TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
StoreEntry ackEntry=ackContainer.placeLast(tsa);
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
container.add(ref);
}
}
}
public ReferenceData getMessageReference(MessageId identity) throws IOException {
ReferenceRecord result=null;
StoreEntry entry=(StoreEntry)cache.get(identity);
if(entry!=null){
entry = messageContainer.refresh(entry);
result = (ReferenceRecord)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
if(msg.messageId.equals(identity)){
result=msg;
cache.put(identity,entry);
break;
}
}
}
if( result == null )
return null;
return result.data;
}
public void addReferenceFileIdsInUse(Set<Integer> rc) {
for (StoreEntry entry = ackContainer.getFirst();entry != null; entry = ackContainer.getNext(entry)) {
TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
if( subAck.getCount() > 0 ) {
ReferenceRecord rr = (ReferenceRecord)messageContainer.get(subAck.getMessageEntry());
rc.add(rr.data.getFileId());
}
}
}
}

View File

@ -0,0 +1,411 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.14 $
*/
public class QuickMessageStore implements MessageStore {
private static final Log log = LogFactory.getLog(QuickMessageStore.class);
protected final QuickPersistenceAdapter peristenceAdapter;
protected final QuickTransactionStore transactionStore;
protected final ReferenceStore referenceStore;
protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
private LinkedHashMap<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
private ArrayList<MessageAck> messageAcks = new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
protected Location lastLocation;
protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore;
this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
}
public void setUsageManager(UsageManager usageManager) {
referenceStore.setUsageManager(usageManager);
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
public void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
final boolean debug = log.isDebugEnabled();
final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled message add for: "+id+", at: "+location);
addMessage(message, location);
} else {
if( debug )
log.debug("Journalled transacted message add for: "+id+", at: "+location);
synchronized( this ) {
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted message add commit for: "+id+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
addMessage(message, location);
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted message add rollback for: "+id+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
}
}
});
}
}
private void addMessage(final Message message, final Location location) {
ReferenceData data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
synchronized (this) {
lastLocation = location;
messages.put(message.getMessageId(), data);
}
}
public void replayAddMessage(ConnectionContext context, Message message, Location location) {
MessageId id = message.getMessageId();
try {
// Only add the message if it has not already been added.
ReferenceData data = referenceStore.getMessageReference(id);
if( data==null ) {
data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
referenceStore.addMessageReference(context, id, data);
}
}
catch (Throwable e) {
log.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e,e);
}
}
/**
*/
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
final boolean debug = log.isDebugEnabled();
JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
removeMessage(ack, location);
} else {
if( debug )
log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
synchronized( this ) {
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
removeMessage(ack, location);
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
synchronized( QuickMessageStore.this ) {
inFlightTxLocations.remove(location);
}
}
});
}
}
private void removeMessage(final MessageAck ack, final Location location) {
synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
ReferenceData data = messages.remove(id);
if (data == null) {
messageAcks.add(ack);
}
}
}
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
if( t!=null ) {
referenceStore.removeMessage(context, messageAck);
}
}
catch (Throwable e) {
log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
}
/**
* @return
* @throws IOException
*/
public Location checkpoint() throws IOException {
return checkpoint(null);
}
/**
* @return
* @throws IOException
*/
public Location checkpoint(final Callback postCheckpointTest) throws IOException {
final ArrayList<MessageAck> cpRemovedMessageLocations;
final ArrayList<Location> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
// swap out the message hash maps..
synchronized (this) {
cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, ReferenceData>();
this.messageAcks = new ArrayList<MessageAck>();
}
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
int size = 0;
PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
ConnectionContext context = transactionTemplate.getContext();
// Checkpoint the added messages.
Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
while (iterator.hasNext()) {
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
} catch (Throwable e) {
log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
size ++;
// Commit the batch if it's getting too big
if( size >= maxCheckpointMessageAddSize ) {
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
size=0;
}
}
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
for (MessageAck ack : cpRemovedMessageLocations) {
try {
referenceStore.removeMessage(transactionTemplate.getContext(), ack);
} catch (Throwable e) {
log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
}
}
if( postCheckpointTest!= null ) {
postCheckpointTest.execute();
}
}
});
synchronized (this) {
cpAddedMessageIds = null;
}
if( cpActiveJournalLocations.size() > 0 ) {
Collections.sort(cpActiveJournalLocations);
return cpActiveJournalLocations.get(0);
} else {
return lastLocation;
}
}
/**
*
*/
public Message getMessage(MessageId identity) throws IOException {
ReferenceData data=null;
synchronized (this) {
// Is it still in flight???
data = messages.get(identity);
if( data==null && cpAddedMessageIds!=null ) {
data = cpAddedMessageIds.get(identity);
}
}
if( data==null ) {
data = referenceStore.getMessageReference(identity);
}
if( data==null ) {
return null;
}
Message answer = null;
if (answer != null ) {
return answer;
}
Location location = new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
return (Message) peristenceAdapter.readCommand(location);
}
/**
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
* @param listener
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true);
referenceStore.recover(new RecoveryListenerAdapter(this, listener));
}
public void start() throws Exception {
referenceStore.start();
}
public void stop() throws Exception {
referenceStore.stop();
}
/**
* @return Returns the longTermStore.
*/
public ReferenceStore getReferenceStore() {
return referenceStore;
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException {
peristenceAdapter.checkpoint(true);
referenceStore.removeAllMessages(context);
}
public ActiveMQDestination getDestination() {
return destination;
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("The journal does not support message references.");
}
/**
* @return
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount() throws IOException{
peristenceAdapter.checkpoint(true);
return referenceStore.getMessageCount();
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
peristenceAdapter.checkpoint(true);
referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener));
}
public void resetBatching(){
referenceStore.resetBatching();
}
}

View File

@ -0,0 +1,699 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.store.quick.QuickTransactionStore.Tx;
import org.apache.activemq.store.quick.QuickTransactionStore.TxOperation;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.17 $
*/
public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListener {
private static final Log log = LogFactory.getLog(QuickPersistenceAdapter.class);
private final ConcurrentHashMap<ActiveMQQueue, QuickMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, QuickMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, QuickMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, QuickMessageStore>();
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
private UsageManager usageManager;
private long checkpointInterval = 1000 * 30;
private int maxCheckpointWorkers = 1;
private int maxCheckpointMessageAddSize = 1024*4;
private QuickTransactionStore transactionStore = new QuickTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);
private Runnable periodicCheckpointTask;
private Runnable periodicCleanupTask;
private boolean deleteAllMessages;
private File directory = new File("activemq-data/quick");
public synchronized void start() throws Exception {
if( !started.compareAndSet(false, true) )
return;
this.usageManager.addUsageListener(this);
if( asyncDataManager == null ) {
asyncDataManager = createAsyncDataManager();
}
if( referenceStoreAdapter==null ) {
referenceStoreAdapter = createReferenceStoreAdapter();
}
referenceStoreAdapter.setUsageManager(usageManager);
if( taskRunnerFactory==null ) {
taskRunnerFactory = createTaskRunnerFactory();
}
asyncDataManager.start();
if( deleteAllMessages ) {
asyncDataManager.delete();
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED "+new Date());
Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
asyncDataManager.setMark(location, true);
log.info("Journal deleted: ");
deleteAllMessages=false;
} catch (IOException e) {
throw e;
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
referenceStoreAdapter.deleteAllMessages();
}
referenceStoreAdapter.start();
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
for (Integer fileId : files) {
asyncDataManager.addInterestInFile(fileId);
}
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
public boolean iterate() {
doCheckpoint();
return false;
}
}, "ActiveMQ Journal Checkpoint Worker");
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker");
t.setPriority(7);
return t;
}
});
createTransactionStore();
recover();
// Do a checkpoint periodically.
periodicCheckpointTask = new Runnable() {
public void run() {
checkpoint(false);
}
};
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
periodicCleanupTask = new Runnable() {
public void run() {
cleanup();
}
};
Scheduler.executePeriodically(periodicCleanupTask, checkpointInterval);
}
public void stop() throws Exception {
if( !started.compareAndSet(true, false) )
return;
this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask);
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
checkpointTask.shutdown();
log.debug("Checkpoint task shutdown");
checkpointExecutor.shutdown();
queues.clear();
topics.clear();
IOException firstException = null;
referenceStoreAdapter.stop();
try {
log.debug("Journal close");
asyncDataManager.close();
} catch (Exception e) {
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
if (firstException != null) {
throw firstException;
}
}
/**
* When we checkpoint we move all the journalled data to long term storage.
* @param stopping
*
* @param b
*/
public void checkpoint(boolean sync) {
try {
if (asyncDataManager == null )
throw new IllegalStateException("Journal is closed.");
CountDownLatch latch = null;
synchronized(this) {
latch = nextCheckpointCountDownLatch;
}
checkpointTask.wakeup();
if (sync) {
log.debug("Waitng for checkpoint to complete.");
latch.await();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Request to start checkpoint failed: " + e, e);
}
}
/**
* This does the actual checkpoint.
* @return
*/
public boolean doCheckpoint() {
CountDownLatch latch = null;
synchronized(this) {
latch = nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1);
}
try {
log.debug("Checkpoint started.");
Location newMark = null;
ArrayList<FutureTask> futureTasks = new ArrayList<FutureTask>(queues.size()+topics.size());
//
Iterator<QuickMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
try {
final QuickMessageStore ms = iterator.next();
FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() {
public Location call() throws Exception {
return ms.checkpoint();
}});
futureTasks.add(task);
checkpointExecutor.execute(task);
}
catch (Exception e) {
log.error("Failed to checkpoint a message store: " + e, e);
}
}
iterator = topics.values().iterator();
while (iterator.hasNext()) {
try {
final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() {
public Location call() throws Exception {
return ms.checkpoint();
}});
futureTasks.add(task);
checkpointExecutor.execute(task);
}
catch (Exception e) {
log.error("Failed to checkpoint a message store: " + e, e);
}
}
try {
for (Iterator<FutureTask> iter = futureTasks.iterator(); iter.hasNext();) {
FutureTask ft = iter.next();
Location mark = (Location) ft.get();
// We only set a newMark on full checkpoints.
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
} catch (Throwable e) {
log.error("Failed to checkpoint a message store: " + e, e);
}
try {
if (newMark != null) {
log.debug("Marking journal at: " + newMark);
asyncDataManager.setMark(newMark, false);
writeTraceMessage("CHECKPOINT "+new Date(), true);
}
}
catch (Exception e) {
log.error("Failed to mark the Journal: " + e, e);
}
// if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
// // We may be check pointing more often than the checkpointInterval if under high use
// // But we don't want to clean up the db that often.
// long now = System.currentTimeMillis();
// if( now > lastCleanup+checkpointInterval ) {
// lastCleanup = now;
// ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
// }
// }
log.debug("Checkpoint done.");
}
finally {
latch.countDown();
}
return true;
}
/**
* Cleans up the data files
* @return
* @throws IOException
*/
public void cleanup() {
try {
Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse);
} catch (IOException e) {
log.error("Could not cleanup data files: "+e, e);
}
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
if (destination.isQueue()) {
return createQueueMessageStore((ActiveMQQueue) destination);
}
else {
return createTopicMessageStore((ActiveMQTopic) destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
QuickMessageStore store = queues.get(destination);
if (store == null) {
ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
store = new QuickMessageStore(this, checkpointStore, destination);
queues.put(destination, store);
}
return store;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
QuickTopicMessageStore store = (QuickTopicMessageStore) topics.get(destinationName);
if (store == null) {
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
store = new QuickTopicMessageStore(this, checkpointStore, destinationName);
topics.put(destinationName, store);
}
return store;
}
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
public long getLastMessageBrokerSequenceId() throws IOException {
return referenceStoreAdapter.getLastMessageBrokerSequenceId();
}
public void beginTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.beginTransaction(context);
}
public void commitTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.commitTransaction(context);
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.rollbackTransaction(context);
}
/**
* @param location
* @return
* @throws IOException
*/
public DataStructure readCommand(Location location) throws IOException {
try {
ByteSequence packet = asyncDataManager.read(location);
return (DataStructure) wireFormat.unmarshal(packet);
} catch (IOException e) {
throw createReadException(location, e);
}
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException, IOException {
Location pos = null;
int transactionCounter = 0;
log.info("Journal Recovery Started from: " + asyncDataManager);
ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
ByteSequence data = asyncDataManager.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
if (c instanceof Message ) {
Message message = (Message) c;
QuickMessageStore store = (QuickMessageStore) createMessageStore(message.getDestination());
if ( message.isInTransaction()) {
transactionStore.addMessage(store, message, pos);
}
else {
store.replayAddMessage(context, message, pos);
transactionCounter++;
}
} else {
switch (c.getDataStructureType()) {
case JournalQueueAck.DATA_STRUCTURE_TYPE:
{
JournalQueueAck command = (JournalQueueAck) c;
QuickMessageStore store = (QuickMessageStore) createMessageStore(command.getDestination());
if (command.getMessageAck().isInTransaction()) {
transactionStore.removeMessage(store, command.getMessageAck(), pos);
}
else {
store.replayRemoveMessage(context, command.getMessageAck());
transactionCounter++;
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE:
{
JournalTopicAck command = (JournalTopicAck) c;
QuickTopicMessageStore store = (QuickTopicMessageStore) createMessageStore(command.getDestination());
if (command.getTransactionId() != null) {
transactionStore.acknowledge(store, command, pos);
}
else {
store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
transactionCounter++;
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE:
{
JournalTransaction command = (JournalTransaction) c;
try {
// Try to replay the packet.
switch (command.getType()) {
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
if (tx == null)
break; // We may be trying to replay a commit that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
op.store.replayAddMessage(context, (Message)op.data, pos);
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
op.store.replayRemoveMessage(context, (MessageAck) op.data);
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
JournalTopicAck ack = (JournalTopicAck) op.data;
((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
.getMessageId());
}
}
transactionCounter++;
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
transactionStore.replayRollback(command.getTransactionId());
break;
}
}
catch (IOException e) {
log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace) c;
log.debug("TRACE Entry: " + trace.getMessage());
break;
default:
log.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
Location location = writeTraceMessage("RECOVERED "+new Date(), true);
asyncDataManager.setMark(location, true);
log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
}
private IOException createReadException(Location location, Exception e) {
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
}
protected IOException createWriteException(DataStructure packet, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
}
protected IOException createWriteException(String command, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
}
protected IOException createRecoveryFailedException(Exception e) {
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
}
/**
*
* @param command
* @param sync
* @return
* @throws IOException
*/
public Location writeCommand(DataStructure command, boolean sync) throws IOException {
return asyncDataManager.write(wireFormat.marshal(command), sync);
}
private Location writeTraceMessage(String message, boolean sync) throws IOException {
JournalTrace trace = new JournalTrace();
trace.setMessage(message);
return writeCommand(trace, sync);
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = ((newPercentUsage)/10)*10;
oldPercentUsage = ((oldPercentUsage)/10)*10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
boolean sync = newPercentUsage >= 90;
checkpoint(sync);
}
}
public QuickTransactionStore getTransactionStore() {
return transactionStore;
}
public void deleteAllMessages() throws IOException {
deleteAllMessages=true;
}
public String toString(){
return "JournalPersistenceAdapator(" + referenceStoreAdapter + ")";
}
///////////////////////////////////////////////////////////////////
// Subclass overridables
///////////////////////////////////////////////////////////////////
protected AsyncDataManager createAsyncDataManager() {
AsyncDataManager manager = new AsyncDataManager();
manager.setDirectory(new File(directory, "journal"));
return manager;
}
protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory);
return adaptor;
}
protected TaskRunnerFactory createTaskRunnerFactory() {
return DefaultThreadPools.getDefaultTaskRunnerFactory();
}
///////////////////////////////////////////////////////////////////
// Property Accessors
///////////////////////////////////////////////////////////////////
public AsyncDataManager getAsyncDataManager() {
return asyncDataManager;
}
public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
this.asyncDataManager = asyncDataManager;
}
public ReferenceStoreAdapter getReferenceStoreAdapter() {
return referenceStoreAdapter;
}
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
this.referenceStoreAdapter = referenceStoreAdapter;
}
public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
/**
* @return Returns the wireFormat.
*/
public WireFormat getWireFormat() {
return wireFormat;
}
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
public UsageManager getUsageManager() {
return usageManager;
}
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
}
public int getMaxCheckpointMessageAddSize() {
return maxCheckpointMessageAddSize;
}
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
}
public int getMaxCheckpointWorkers() {
return maxCheckpointWorkers;
}
public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
this.maxCheckpointWorkers = maxCheckpointWorkers;
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
}

View File

@ -0,0 +1,205 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.13 $
*/
public class QuickTopicMessageStore extends QuickMessageStore implements TopicMessageStore {
private static final Log log = LogFactory.getLog(QuickTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore;
private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore checkpointStore, ActiveMQTopic destinationName) {
super(adapter, checkpointStore, destinationName);
this.topicReferenceStore = checkpointStore;
}
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
this.peristenceAdapter.checkpoint(true);
topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true);
topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener));
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
this.peristenceAdapter.checkpoint(true);
topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
}
/**
*/
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
final boolean debug = log.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
final Location location = peristenceAdapter.writeCommand(ack, false);
final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
acknowledge(messageId, location, key);
} else {
if( debug )
log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.acknowledge(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
synchronized (QuickTopicMessageStore.this) {
inFlightTxLocations.remove(location);
acknowledge(messageId, location, key);
}
}
public void afterRollback() throws Exception {
if( debug )
log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
synchronized (QuickTopicMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
});
}
}
public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
try {
SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
if( sub != null ) {
topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
}
}
catch (Throwable e) {
log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
}
}
/**
* @param messageId
* @param location
* @param key
*/
private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) {
synchronized(this) {
lastLocation = location;
ackedLastAckLocations.put(key, messageId);
}
}
public Location checkpoint() throws IOException {
final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
cpAckedLastAckLocations = this.ackedLastAckLocations;
this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
}
return super.checkpoint( new Callback() {
public void execute() throws Exception {
// Checkpoint the acknowledged messages.
Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
while (iterator.hasNext()) {
SubscriptionKey subscriptionKey = iterator.next();
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
}
}
});
}
/**
* @return Returns the longTermStore.
*/
public TopicReferenceStore getTopicReferenceStore() {
return topicReferenceStore;
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
topicReferenceStore.deleteSubscription(clientId, subscriptionName);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return topicReferenceStore.getAllSubscriptions();
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true);
return topicReferenceStore.getMessageCount(clientId,subscriberName);
}
public void resetBatching(String clientId,String subscriptionName) {
topicReferenceStore.resetBatching(clientId,subscriptionName);
}
}

View File

@ -0,0 +1,338 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
/**
*/
public class QuickTransactionStore implements TransactionStore {
private final QuickPersistenceAdapter peristenceAdapter;
Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId, Tx>();
Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
private boolean doingRecover;
public static class TxOperation {
static final byte ADD_OPERATION_TYPE = 0;
static final byte REMOVE_OPERATION_TYPE = 1;
static final byte ACK_OPERATION_TYPE = 3;
public byte operationType;
public QuickMessageStore store;
public Object data;
public TxOperation(byte operationType, QuickMessageStore store, Object data) {
this.operationType=operationType;
this.store=store;
this.data=data;
}
}
/**
* Operations
* @version $Revision: 1.6 $
*/
public static class Tx {
private final Location location;
private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
public Tx(Location location) {
this.location=location;
}
public void add(QuickMessageStore store, Message msg) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
}
public void add(QuickMessageStore store, MessageAck ack) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
}
public void add(QuickTopicMessageStore store, JournalTopicAck ack) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
}
public Message[] getMessages() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
list.add(op.data);
}
}
Message rc[] = new Message[list.size()];
list.toArray(rc);
return rc;
}
public MessageAck[] getAcks() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
list.add(op.data);
}
}
MessageAck rc[] = new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
public ArrayList<TxOperation> getOperations() {
return operations;
}
}
public QuickTransactionStore(QuickPersistenceAdapter adapter) {
this.peristenceAdapter = adapter;
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx==null)
return;
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void replayPrepare(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx==null)
return;
synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
}
public Tx getTx(TransactionId txid,Location location){
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.get(txid);
}
if(tx==null){
tx=new Tx(location);
inflightTransactions.put(txid,tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
Tx tx;
if(wasPrepared){
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
}
}else{
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
}
if(tx==null)
return;
if(txid.isXATransaction()){
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
}else{
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
true);
}
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
if(wasPrepared){
synchronized(preparedTransactions){
return preparedTransactions.remove(txid);
}
}else{
synchronized(inflightTransactions){
return inflightTransactions.remove(txid);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx!=null)
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
}
if(tx!=null){
if(txid.isXATransaction()){
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
}else{
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
true);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void replayRollback(TransactionId txid) throws IOException{
boolean inflight=false;
synchronized(inflightTransactions){
inflight=inflightTransactions.remove(txid)!=null;
}
if(inflight){
synchronized(preparedTransactions){
preparedTransactions.remove(txid);
}
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
// All the in-flight transactions get rolled back..
synchronized(inflightTransactions){
inflightTransactions.clear();
}
this.doingRecover=true;
try{
Map<TransactionId, Tx> txs=null;
synchronized(preparedTransactions){
txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
}
for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
Object txid=iter.next();
Tx tx=txs.get(txid);
listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
}
}finally{
this.doingRecover=false;
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(QuickMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message);
}
/**
* @param ack
* @throws IOException
*/
public void removeMessage(QuickMessageStore store, MessageAck ack, Location location) throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public void acknowledge(QuickTopicMessageStore store, JournalTopicAck ack, Location location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public Location checkpoint() throws IOException{
// Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are committed.
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
Location rc=null;
synchronized(inflightTransactions){
for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
}
synchronized(preparedTransactions){
for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
return rc;
}
}
public boolean isDoingRecover() {
return doingRecover;
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.quick;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
final class RecoveryListenerAdapter implements MessageRecoveryListener {
private final MessageStore store;
private final MessageRecoveryListener listener;
RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) {
this.store = store;
this.listener = listener;
}
public void finished() {
listener.finished();
}
public boolean hasSpace() {
return listener.hasSpace();
}
public void recoverMessage(Message message) throws Exception {
listener.recoverMessage(message);
}
public void recoverMessageReference(MessageId ref) throws Exception {
listener.recoverMessage( this.store.getMessage(ref) );
}
}

View File

@ -0,0 +1,27 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
<p>
</p>
</body>
</html>

View File

@ -54,14 +54,14 @@ public class QuickJournalRecoveryBrokerTest extends RecoveryBrokerTest {
}
@Override
public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception {
// TODO: this test is currently failing in base class.. overriden to avoid failure
}
@Override
public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
// TODO: this test is currently failing in base class.. overriden to avoid failure
}
// @Override
// public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception {
// // TODO: this test is currently failing in base class.. overriden to avoid failure
// }
//
// @Override
// public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
// // TODO: this test is currently failing in base class.. overriden to avoid failure
// }
}

View File

@ -18,7 +18,7 @@
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out
log4j.rootLogger=DEBUG, stdout
log4j.logger.org.apache.activemq.spring=WARN