mirror of https://github.com/apache/activemq.git
Implementing a Rapid store which is a mix of the QuickJournal and the Kaha store.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@405807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7947655e5f
commit
a522d3abf7
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.store.rapid;
|
||||||
|
|
||||||
|
import org.apache.activeio.journal.RecordLocation;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
|
||||||
|
public class RapidMessageReference {
|
||||||
|
public final MessageId messageId;
|
||||||
|
public final long expiration;
|
||||||
|
public final RecordLocation location;
|
||||||
|
|
||||||
|
public RapidMessageReference(Message message, RecordLocation location) {
|
||||||
|
this.messageId = message.getMessageId();
|
||||||
|
this.expiration = message.getExpiration();
|
||||||
|
this.location=location;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getExpiration() {
|
||||||
|
return expiration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageId getMessageId() {
|
||||||
|
return messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecordLocation getLocation() {
|
||||||
|
return location;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,289 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.rapid;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import org.apache.activeio.journal.RecordLocation;
|
||||||
|
import org.apache.activeio.journal.active.Location;
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.JournalQueueAck;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
import org.apache.activemq.kaha.MapContainer;
|
||||||
|
import org.apache.activemq.memory.UsageManager;
|
||||||
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.transaction.Synchronization;
|
||||||
|
import org.apache.activemq.util.TransactionTemplate;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A MessageStore that uses a Journal to store it's messages.
|
||||||
|
*
|
||||||
|
* @version $Revision: 1.14 $
|
||||||
|
*/
|
||||||
|
public class RapidMessageStore implements MessageStore {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(RapidMessageStore.class);
|
||||||
|
|
||||||
|
protected final RapidPersistenceAdapter peristenceAdapter;
|
||||||
|
protected final RapidTransactionStore transactionStore;
|
||||||
|
protected final MapContainer messageContainer;
|
||||||
|
protected final ActiveMQDestination destination;
|
||||||
|
protected final TransactionTemplate transactionTemplate;
|
||||||
|
|
||||||
|
// private LinkedHashMap messages = new LinkedHashMap();
|
||||||
|
// private ArrayList messageAcks = new ArrayList();
|
||||||
|
|
||||||
|
// /** A MessageStore that we can use to retrieve messages quickly. */
|
||||||
|
// private LinkedHashMap cpAddedMessageIds;
|
||||||
|
|
||||||
|
protected RecordLocation lastLocation;
|
||||||
|
protected HashSet inFlightTxLocations = new HashSet();
|
||||||
|
|
||||||
|
public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) {
|
||||||
|
this.peristenceAdapter = adapter;
|
||||||
|
this.transactionStore = adapter.getTransactionStore();
|
||||||
|
this.messageContainer = container;
|
||||||
|
this.destination = destination;
|
||||||
|
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not synchronized since the Journal has better throughput if you increase
|
||||||
|
* the number of concurrent writes that it is doing.
|
||||||
|
*/
|
||||||
|
public void addMessage(ConnectionContext context, final Message message) throws IOException {
|
||||||
|
|
||||||
|
final MessageId id = message.getMessageId();
|
||||||
|
|
||||||
|
final boolean debug = log.isDebugEnabled();
|
||||||
|
final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
|
||||||
|
final RapidMessageReference md = new RapidMessageReference(message, location);
|
||||||
|
|
||||||
|
if( !context.isInTransaction() ) {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Journalled message add for: "+id+", at: "+location);
|
||||||
|
addMessage(md);
|
||||||
|
} else {
|
||||||
|
message.incrementReferenceCount();
|
||||||
|
if( debug )
|
||||||
|
log.debug("Journalled transacted message add for: "+id+", at: "+location);
|
||||||
|
synchronized( this ) {
|
||||||
|
inFlightTxLocations.add(location);
|
||||||
|
}
|
||||||
|
transactionStore.addMessage(this, message, location);
|
||||||
|
context.getTransaction().addSynchronization(new Synchronization(){
|
||||||
|
public void afterCommit() throws Exception {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Transacted message add commit for: "+id+", at: "+location);
|
||||||
|
message.decrementReferenceCount();
|
||||||
|
synchronized( RapidMessageStore.this ) {
|
||||||
|
inFlightTxLocations.remove(location);
|
||||||
|
addMessage(md);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void afterRollback() throws Exception {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Transacted message add rollback for: "+id+", at: "+location);
|
||||||
|
message.decrementReferenceCount();
|
||||||
|
synchronized( RapidMessageStore.this ) {
|
||||||
|
inFlightTxLocations.remove(location);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addMessage(final RapidMessageReference messageReference) {
|
||||||
|
synchronized (this) {
|
||||||
|
lastLocation = messageReference.getLocation();
|
||||||
|
MessageId id = messageReference.getMessageId();
|
||||||
|
messageContainer.put(id.toString(), messageReference);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static protected String toString(RecordLocation location) {
|
||||||
|
Location l = (Location) location;
|
||||||
|
return l.getLogFileId()+":"+l.getLogFileOffset();
|
||||||
|
}
|
||||||
|
|
||||||
|
static protected RecordLocation toRecordLocation(String t) {
|
||||||
|
String[] strings = t.split(":");
|
||||||
|
if( strings.length!=2 )
|
||||||
|
throw new IllegalArgumentException("Invalid location: "+t);
|
||||||
|
return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1]));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replayAddMessage(ConnectionContext context, Message message, RecordLocation location) {
|
||||||
|
try {
|
||||||
|
RapidMessageReference messageReference = new RapidMessageReference(message, location);
|
||||||
|
messageContainer.put(message.getMessageId().toString(), messageReference);
|
||||||
|
}
|
||||||
|
catch (Throwable e) {
|
||||||
|
log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
|
||||||
|
final boolean debug = log.isDebugEnabled();
|
||||||
|
JournalQueueAck remove = new JournalQueueAck();
|
||||||
|
remove.setDestination(destination);
|
||||||
|
remove.setMessageAck(ack);
|
||||||
|
|
||||||
|
final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
|
||||||
|
if( !context.isInTransaction() ) {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
|
||||||
|
removeMessage(ack, location);
|
||||||
|
} else {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
|
||||||
|
synchronized( this ) {
|
||||||
|
inFlightTxLocations.add(location);
|
||||||
|
}
|
||||||
|
transactionStore.removeMessage(this, ack, location);
|
||||||
|
context.getTransaction().addSynchronization(new Synchronization(){
|
||||||
|
public void afterCommit() throws Exception {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
|
||||||
|
synchronized( RapidMessageStore.this ) {
|
||||||
|
inFlightTxLocations.remove(location);
|
||||||
|
removeMessage(ack, location);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void afterRollback() throws Exception {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
|
||||||
|
synchronized( RapidMessageStore.this ) {
|
||||||
|
inFlightTxLocations.remove(location);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeMessage(final MessageAck ack, final RecordLocation location) {
|
||||||
|
synchronized (this) {
|
||||||
|
lastLocation = location;
|
||||||
|
MessageId id = ack.getLastMessageId();
|
||||||
|
messageContainer.remove(id.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replayRemoveMessage(ConnectionContext context, MessageAck ack) {
|
||||||
|
try {
|
||||||
|
MessageId id = ack.getLastMessageId();
|
||||||
|
messageContainer.remove(id.toString());
|
||||||
|
}
|
||||||
|
catch (Throwable e) {
|
||||||
|
log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public Message getMessage(MessageId id) throws IOException {
|
||||||
|
RapidMessageReference messageReference = (RapidMessageReference) messageContainer.get(id.toString());
|
||||||
|
if (messageReference == null )
|
||||||
|
return null;
|
||||||
|
return (Message) peristenceAdapter.readCommand(messageReference.getLocation());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replays the checkpointStore first as those messages are the oldest ones,
|
||||||
|
* then messages are replayed from the transaction log and then the cache is
|
||||||
|
* updated.
|
||||||
|
*
|
||||||
|
* @param listener
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void recover(final MessageRecoveryListener listener) throws Exception {
|
||||||
|
|
||||||
|
for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
|
||||||
|
RapidMessageReference messageReference=(RapidMessageReference) iter.next();
|
||||||
|
Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation());
|
||||||
|
listener.recoverMessage(m);
|
||||||
|
}
|
||||||
|
listener.finished();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
|
||||||
|
*/
|
||||||
|
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||||
|
messageContainer.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQDestination getDestination() {
|
||||||
|
return destination;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
|
||||||
|
throw new IOException("The journal does not support message references.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessageReference(MessageId identity) throws IOException {
|
||||||
|
throw new IOException("The journal does not support message references.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void setUsageManager(UsageManager usageManager) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public RecordLocation checkpoint() throws IOException {
|
||||||
|
|
||||||
|
ArrayList cpActiveJournalLocations;
|
||||||
|
|
||||||
|
// swap out the message hash maps..
|
||||||
|
synchronized (this) {
|
||||||
|
cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
|
||||||
|
}
|
||||||
|
|
||||||
|
if( cpActiveJournalLocations.size() > 0 ) {
|
||||||
|
Collections.sort(cpActiveJournalLocations);
|
||||||
|
return (RecordLocation) cpActiveJournalLocations.get(0);
|
||||||
|
} else {
|
||||||
|
return lastLocation;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,672 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.rapid;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.activeio.command.WireFormat;
|
||||||
|
import org.apache.activeio.journal.InvalidRecordLocationException;
|
||||||
|
import org.apache.activeio.journal.Journal;
|
||||||
|
import org.apache.activeio.journal.JournalEventListener;
|
||||||
|
import org.apache.activeio.journal.RecordLocation;
|
||||||
|
import org.apache.activeio.journal.active.JournalImpl;
|
||||||
|
import org.apache.activeio.packet.Packet;
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.DataStructure;
|
||||||
|
import org.apache.activemq.command.JournalQueueAck;
|
||||||
|
import org.apache.activemq.command.JournalTopicAck;
|
||||||
|
import org.apache.activemq.command.JournalTrace;
|
||||||
|
import org.apache.activemq.command.JournalTransaction;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
|
import org.apache.activemq.kaha.MapContainer;
|
||||||
|
import org.apache.activemq.kaha.Store;
|
||||||
|
import org.apache.activemq.kaha.StoreFactory;
|
||||||
|
import org.apache.activemq.kaha.StringMarshaller;
|
||||||
|
import org.apache.activemq.memory.UsageListener;
|
||||||
|
import org.apache.activemq.memory.UsageManager;
|
||||||
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.TopicMessageStore;
|
||||||
|
import org.apache.activemq.store.TransactionStore;
|
||||||
|
import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
|
||||||
|
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
|
||||||
|
import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
|
||||||
|
import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;
|
||||||
|
import org.apache.activemq.thread.Scheduler;
|
||||||
|
import org.apache.activemq.thread.Task;
|
||||||
|
import org.apache.activemq.thread.TaskRunner;
|
||||||
|
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.Callable;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An implementation of {@link PersistenceAdapter} designed for use with a
|
||||||
|
* {@link Journal} and then check pointing asynchronously on a timeout with some
|
||||||
|
* other long term persistent storage.
|
||||||
|
*
|
||||||
|
* @org.apache.xbean.XBean
|
||||||
|
*
|
||||||
|
* @version $Revision: 1.17 $
|
||||||
|
*/
|
||||||
|
public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(RapidPersistenceAdapter.class);
|
||||||
|
private final Journal journal;
|
||||||
|
|
||||||
|
private final WireFormat wireFormat = new OpenWireFormat();
|
||||||
|
|
||||||
|
private final ConcurrentHashMap queues = new ConcurrentHashMap();
|
||||||
|
private final ConcurrentHashMap topics = new ConcurrentHashMap();
|
||||||
|
|
||||||
|
private long checkpointInterval = 1000 * 60 * 5;
|
||||||
|
private long lastCheckpointRequest = System.currentTimeMillis();
|
||||||
|
private long lastCleanup = System.currentTimeMillis();
|
||||||
|
private int maxCheckpointWorkers = 10;
|
||||||
|
private int maxCheckpointMessageAddSize = 5000;
|
||||||
|
|
||||||
|
private RapidTransactionStore transactionStore = new RapidTransactionStore(this);
|
||||||
|
private ThreadPoolExecutor checkpointExecutor;
|
||||||
|
|
||||||
|
private TaskRunner checkpointTask;
|
||||||
|
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
|
||||||
|
private boolean fullCheckPoint;
|
||||||
|
|
||||||
|
private AtomicBoolean started = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
Store store;
|
||||||
|
private boolean useExternalMessageReferences;
|
||||||
|
|
||||||
|
|
||||||
|
private final Runnable periodicCheckpointTask = new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
|
||||||
|
checkpoint(false, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
|
||||||
|
|
||||||
|
this.journal = journal;
|
||||||
|
journal.setJournalEventListener(this);
|
||||||
|
|
||||||
|
File dir = ((JournalImpl)journal).getLogDirectory();
|
||||||
|
String name=dir.getAbsolutePath()+File.separator+"kaha.db";
|
||||||
|
store=StoreFactory.open(name,"rw");
|
||||||
|
|
||||||
|
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
|
||||||
|
public boolean iterate() {
|
||||||
|
return doCheckpoint();
|
||||||
|
}
|
||||||
|
}, "ActiveMQ Checkpoint Worker");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set getDestinations() {
|
||||||
|
Set rc=new HashSet();
|
||||||
|
try {
|
||||||
|
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
|
||||||
|
Object obj=i.next();
|
||||||
|
if(obj instanceof ActiveMQDestination){
|
||||||
|
rc.add(obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}catch(IOException e){
|
||||||
|
log.error("Failed to get destinations " ,e);
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
|
||||||
|
if (destination.isQueue()) {
|
||||||
|
return createQueueMessageStore((ActiveMQQueue) destination);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return createTopicMessageStore((ActiveMQTopic) destination);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
|
||||||
|
RapidMessageStore store = (RapidMessageStore) queues.get(destination);
|
||||||
|
if (store == null) {
|
||||||
|
MapContainer messageContainer=getMapContainer(destination,"topic-data");
|
||||||
|
store = new RapidMessageStore(this, destination, messageContainer);
|
||||||
|
queues.put(destination, store);
|
||||||
|
}
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
|
||||||
|
MapContainer container=store.getMapContainer(id,containerName);
|
||||||
|
container.setKeyMarshaller(new StringMarshaller());
|
||||||
|
if(useExternalMessageReferences){
|
||||||
|
container.setValueMarshaller(new StringMarshaller());
|
||||||
|
}else{
|
||||||
|
container.setValueMarshaller(new CommandMarshaller(wireFormat));
|
||||||
|
}
|
||||||
|
container.load();
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
|
||||||
|
RapidTopicMessageStore store = (RapidTopicMessageStore) topics.get(destination);
|
||||||
|
if (store == null) {
|
||||||
|
|
||||||
|
MapContainer messageContainer=getMapContainer(destination,"topic-data");
|
||||||
|
MapContainer subsContainer=getMapContainer(destination.toString()+"-subscriptions","topic-subs");
|
||||||
|
MapContainer ackContainer=this.store.getMapContainer(destination.toString(),"topic-acks");
|
||||||
|
|
||||||
|
ackContainer.setKeyMarshaller(new StringMarshaller());
|
||||||
|
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
|
||||||
|
|
||||||
|
store = new RapidTopicMessageStore(this, destination, messageContainer, subsContainer, ackContainer);
|
||||||
|
topics.put(destination, store);
|
||||||
|
}
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TransactionStore createTransactionStore() throws IOException {
|
||||||
|
return transactionStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastMessageBrokerSequenceId() throws IOException {
|
||||||
|
// TODO: implement this.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void beginTransaction(ConnectionContext context) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void commitTransaction(ConnectionContext context) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void rollbackTransaction(ConnectionContext context) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void start() throws Exception {
|
||||||
|
if( !started.compareAndSet(false, true) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
|
||||||
|
public Thread newThread(Runnable runable) {
|
||||||
|
Thread t = new Thread(runable, "Journal checkpoint worker");
|
||||||
|
t.setPriority(7);
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
checkpointExecutor.allowCoreThreadTimeOut(true);
|
||||||
|
|
||||||
|
createTransactionStore();
|
||||||
|
recover();
|
||||||
|
|
||||||
|
// Do a checkpoint periodically.
|
||||||
|
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws Exception {
|
||||||
|
|
||||||
|
if( !started.compareAndSet(true, false) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
Scheduler.cancel(periodicCheckpointTask);
|
||||||
|
|
||||||
|
// Take one final checkpoint and stop checkpoint processing.
|
||||||
|
checkpoint(false, true);
|
||||||
|
checkpointTask.shutdown();
|
||||||
|
checkpointExecutor.shutdown();
|
||||||
|
|
||||||
|
queues.clear();
|
||||||
|
topics.clear();
|
||||||
|
|
||||||
|
IOException firstException = null;
|
||||||
|
try {
|
||||||
|
journal.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
|
||||||
|
}
|
||||||
|
store.close();
|
||||||
|
|
||||||
|
if (firstException != null) {
|
||||||
|
throw firstException;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Properties
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Returns the wireFormat.
|
||||||
|
*/
|
||||||
|
public WireFormat getWireFormat() {
|
||||||
|
return wireFormat;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implementation methods
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Journal give us a call back so that we can move old data out of the
|
||||||
|
* journal. Taking a checkpoint does this for us.
|
||||||
|
*
|
||||||
|
* @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
|
||||||
|
*/
|
||||||
|
public void overflowNotification(RecordLocation safeLocation) {
|
||||||
|
checkpoint(false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When we checkpoint we move all the journalled data to long term storage.
|
||||||
|
* @param stopping
|
||||||
|
*
|
||||||
|
* @param b
|
||||||
|
*/
|
||||||
|
public void checkpoint(boolean sync, boolean fullCheckpoint) {
|
||||||
|
try {
|
||||||
|
if (journal == null )
|
||||||
|
throw new IllegalStateException("Journal is closed.");
|
||||||
|
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
CountDownLatch latch = null;
|
||||||
|
synchronized(this) {
|
||||||
|
latch = nextCheckpointCountDownLatch;
|
||||||
|
lastCheckpointRequest = now;
|
||||||
|
if( fullCheckpoint ) {
|
||||||
|
this.fullCheckPoint = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
checkpointTask.wakeup();
|
||||||
|
|
||||||
|
if (sync) {
|
||||||
|
log.debug("Waking for checkpoint to complete.");
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.warn("Request to start checkpoint failed: " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This does the actual checkpoint.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean doCheckpoint() {
|
||||||
|
CountDownLatch latch = null;
|
||||||
|
boolean fullCheckpoint;
|
||||||
|
synchronized(this) {
|
||||||
|
latch = nextCheckpointCountDownLatch;
|
||||||
|
nextCheckpointCountDownLatch = new CountDownLatch(1);
|
||||||
|
fullCheckpoint = this.fullCheckPoint;
|
||||||
|
this.fullCheckPoint=false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
|
||||||
|
log.debug("Checkpoint started.");
|
||||||
|
RecordLocation newMark = null;
|
||||||
|
|
||||||
|
ArrayList futureTasks = new ArrayList(queues.size()+topics.size());
|
||||||
|
|
||||||
|
//
|
||||||
|
// We do many partial checkpoints (fullCheckpoint==false) to move topic messages
|
||||||
|
// to long term store as soon as possible.
|
||||||
|
//
|
||||||
|
// We want to avoid doing that for queue messages since removes the come in the same
|
||||||
|
// checkpoint cycle will nullify the previous message add. Therefore, we only
|
||||||
|
// checkpoint queues on the fullCheckpoint cycles.
|
||||||
|
//
|
||||||
|
if( fullCheckpoint ) {
|
||||||
|
Iterator iterator = queues.values().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
try {
|
||||||
|
final RapidMessageStore ms = (RapidMessageStore) iterator.next();
|
||||||
|
FutureTask task = new FutureTask(new Callable() {
|
||||||
|
public Object call() throws Exception {
|
||||||
|
return ms.checkpoint();
|
||||||
|
}});
|
||||||
|
futureTasks.add(task);
|
||||||
|
checkpointExecutor.execute(task);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Failed to checkpoint a message store: " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator iterator = topics.values().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
try {
|
||||||
|
final RapidTopicMessageStore ms = (RapidTopicMessageStore) iterator.next();
|
||||||
|
FutureTask task = new FutureTask(new Callable() {
|
||||||
|
public Object call() throws Exception {
|
||||||
|
return ms.checkpoint();
|
||||||
|
}});
|
||||||
|
futureTasks.add(task);
|
||||||
|
checkpointExecutor.execute(task);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Failed to checkpoint a message store: " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (Iterator iter = futureTasks.iterator(); iter.hasNext();) {
|
||||||
|
FutureTask ft = (FutureTask) iter.next();
|
||||||
|
RecordLocation mark = (RecordLocation) ft.get();
|
||||||
|
// We only set a newMark on full checkpoints.
|
||||||
|
if( fullCheckpoint ) {
|
||||||
|
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
|
||||||
|
newMark = mark;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.error("Failed to checkpoint a message store: " + e, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if( fullCheckpoint ) {
|
||||||
|
try {
|
||||||
|
if (newMark != null) {
|
||||||
|
log.debug("Marking journal at: " + newMark);
|
||||||
|
journal.setMark(newMark, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Failed to mark the Journal: " + e, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: do we need to implement a periodic clean up?
|
||||||
|
|
||||||
|
// if (longTermPersistence instanceof JDBCPersistenceAdapter) {
|
||||||
|
// // We may be check pointing more often than the checkpointInterval if under high use
|
||||||
|
// // But we don't want to clean up the db that often.
|
||||||
|
// long now = System.currentTimeMillis();
|
||||||
|
// if( now > lastCleanup+checkpointInterval ) {
|
||||||
|
// lastCleanup = now;
|
||||||
|
// ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("Checkpoint done.");
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
synchronized(this) {
|
||||||
|
return this.fullCheckPoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param location
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public DataStructure readCommand(RecordLocation location) throws IOException {
|
||||||
|
try {
|
||||||
|
Packet data = journal.read(location);
|
||||||
|
return (DataStructure) wireFormat.unmarshal(data);
|
||||||
|
}
|
||||||
|
catch (InvalidRecordLocationException e) {
|
||||||
|
throw createReadException(location, e);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw createReadException(location, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move all the messages that were in the journal into long term storage. We
|
||||||
|
* just replay and do a checkpoint.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InvalidRecordLocationException
|
||||||
|
* @throws IllegalStateException
|
||||||
|
*/
|
||||||
|
private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
|
||||||
|
|
||||||
|
RecordLocation pos = null;
|
||||||
|
int transactionCounter = 0;
|
||||||
|
|
||||||
|
log.info("Journal Recovery Started.");
|
||||||
|
ConnectionContext context = new ConnectionContext();
|
||||||
|
|
||||||
|
// While we have records in the journal.
|
||||||
|
while ((pos = journal.getNextRecordLocation(pos)) != null) {
|
||||||
|
Packet data = journal.read(pos);
|
||||||
|
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
|
||||||
|
|
||||||
|
if (c instanceof Message ) {
|
||||||
|
Message message = (Message) c;
|
||||||
|
RapidMessageStore store = (RapidMessageStore) createMessageStore(message.getDestination());
|
||||||
|
if ( message.isInTransaction()) {
|
||||||
|
transactionStore.addMessage(store, message, pos);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
store.replayAddMessage(context, message, pos);
|
||||||
|
transactionCounter++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
switch (c.getDataStructureType()) {
|
||||||
|
case JournalQueueAck.DATA_STRUCTURE_TYPE:
|
||||||
|
{
|
||||||
|
JournalQueueAck command = (JournalQueueAck) c;
|
||||||
|
RapidMessageStore store = (RapidMessageStore) createMessageStore(command.getDestination());
|
||||||
|
if (command.getMessageAck().isInTransaction()) {
|
||||||
|
transactionStore.removeMessage(store, command.getMessageAck(), pos);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
store.replayRemoveMessage(context, command.getMessageAck());
|
||||||
|
transactionCounter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case JournalTopicAck.DATA_STRUCTURE_TYPE:
|
||||||
|
{
|
||||||
|
JournalTopicAck command = (JournalTopicAck) c;
|
||||||
|
RapidTopicMessageStore store = (RapidTopicMessageStore) createMessageStore(command.getDestination());
|
||||||
|
if (command.getTransactionId() != null) {
|
||||||
|
transactionStore.acknowledge(store, command, pos);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
|
||||||
|
transactionCounter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case JournalTransaction.DATA_STRUCTURE_TYPE:
|
||||||
|
{
|
||||||
|
JournalTransaction command = (JournalTransaction) c;
|
||||||
|
try {
|
||||||
|
// Try to replay the packet.
|
||||||
|
switch (command.getType()) {
|
||||||
|
case JournalTransaction.XA_PREPARE:
|
||||||
|
transactionStore.replayPrepare(command.getTransactionId());
|
||||||
|
break;
|
||||||
|
case JournalTransaction.XA_COMMIT:
|
||||||
|
case JournalTransaction.LOCAL_COMMIT:
|
||||||
|
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
|
||||||
|
if (tx == null)
|
||||||
|
break; // We may be trying to replay a commit that
|
||||||
|
// was already committed.
|
||||||
|
|
||||||
|
// Replay the committed operations.
|
||||||
|
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
|
||||||
|
TxOperation op = (TxOperation) iter.next();
|
||||||
|
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
|
||||||
|
op.store.replayAddMessage(context, (Message) op.data, op.location);
|
||||||
|
}
|
||||||
|
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
|
||||||
|
op.store.replayRemoveMessage(context, (MessageAck) op.data);
|
||||||
|
}
|
||||||
|
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
|
||||||
|
JournalTopicAck ack = (JournalTopicAck) op.data;
|
||||||
|
((RapidTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
|
||||||
|
.getMessageId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
transactionCounter++;
|
||||||
|
break;
|
||||||
|
case JournalTransaction.LOCAL_ROLLBACK:
|
||||||
|
case JournalTransaction.XA_ROLLBACK:
|
||||||
|
transactionStore.replayRollback(command.getTransactionId());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case JournalTrace.DATA_STRUCTURE_TYPE:
|
||||||
|
JournalTrace trace = (JournalTrace) c;
|
||||||
|
log.debug("TRACE Entry: " + trace.getMessage());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
log.error("Unknown type of record in transaction log which will be discarded: " + c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RecordLocation location = writeTraceMessage("RECOVERED", true);
|
||||||
|
journal.setMark(location, true);
|
||||||
|
|
||||||
|
log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private IOException createReadException(RecordLocation location, Exception e) {
|
||||||
|
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected IOException createWriteException(DataStructure packet, Exception e) {
|
||||||
|
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected IOException createWriteException(String command, Exception e) {
|
||||||
|
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected IOException createRecoveryFailedException(Exception e) {
|
||||||
|
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param command
|
||||||
|
* @param sync
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
|
||||||
|
if( started.get() )
|
||||||
|
return journal.write(wireFormat.marshal(command), sync);
|
||||||
|
throw new IOException("closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
|
||||||
|
JournalTrace trace = new JournalTrace();
|
||||||
|
trace.setMessage(message);
|
||||||
|
return writeCommand(trace, sync);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
|
||||||
|
if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) {
|
||||||
|
checkpoint(false, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RapidTransactionStore getTransactionStore() {
|
||||||
|
return transactionStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteAllMessages() throws IOException {
|
||||||
|
try {
|
||||||
|
JournalTrace trace = new JournalTrace();
|
||||||
|
trace.setMessage("DELETED");
|
||||||
|
RecordLocation location = journal.write(wireFormat.marshal(trace), false);
|
||||||
|
journal.setMark(location, true);
|
||||||
|
log.info("Journal deleted: ");
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw e;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw IOExceptionSupport.create(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(store!=null){
|
||||||
|
store.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxCheckpointMessageAddSize() {
|
||||||
|
return maxCheckpointMessageAddSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
|
||||||
|
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxCheckpointWorkers() {
|
||||||
|
return maxCheckpointWorkers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
|
||||||
|
this.maxCheckpointWorkers = maxCheckpointWorkers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isUseExternalMessageReferences() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUseExternalMessageReferences(boolean enable) {
|
||||||
|
if( enable )
|
||||||
|
throw new IllegalArgumentException("The journal does not support message references.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUsageManager(UsageManager usageManager) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public Store getStore() {
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,294 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.rapid;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.activeio.journal.RecordLocation;
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.JournalTopicAck;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
import org.apache.activemq.command.SubscriptionInfo;
|
||||||
|
import org.apache.activemq.kaha.ListContainer;
|
||||||
|
import org.apache.activemq.kaha.MapContainer;
|
||||||
|
import org.apache.activemq.kaha.Marshaller;
|
||||||
|
import org.apache.activemq.kaha.Store;
|
||||||
|
import org.apache.activemq.kaha.StringMarshaller;
|
||||||
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
|
import org.apache.activemq.store.TopicMessageStore;
|
||||||
|
import org.apache.activemq.transaction.Synchronization;
|
||||||
|
import org.apache.activemq.util.SubscriptionKey;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A MessageStore that uses a Journal to store it's messages.
|
||||||
|
*
|
||||||
|
* @version $Revision: 1.13 $
|
||||||
|
*/
|
||||||
|
public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(RapidTopicMessageStore.class);
|
||||||
|
|
||||||
|
private HashMap ackedLastAckLocations = new HashMap();
|
||||||
|
private final MapContainer subscriberContainer;
|
||||||
|
private final MapContainer ackContainer;
|
||||||
|
private final Store store;
|
||||||
|
private Map subscriberAcks=new ConcurrentHashMap();
|
||||||
|
|
||||||
|
public RapidTopicMessageStore(RapidPersistenceAdapter adapter, ActiveMQTopic destination, MapContainer messageContainer, MapContainer subsContainer, MapContainer ackContainer) throws IOException {
|
||||||
|
super(adapter, destination, messageContainer);
|
||||||
|
this.subscriberContainer = subsContainer;
|
||||||
|
this.ackContainer = ackContainer;
|
||||||
|
this.store=adapter.getStore();
|
||||||
|
|
||||||
|
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
|
||||||
|
Object key=i.next();
|
||||||
|
addSubscriberAckContainer(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
|
||||||
|
|
||||||
|
String key=getSubscriptionKey(clientId,subscriptionName);
|
||||||
|
ListContainer list=(ListContainer) subscriberAcks.get(key);
|
||||||
|
if(list!=null){
|
||||||
|
for(Iterator i=list.iterator();i.hasNext();){
|
||||||
|
Object msg=messageContainer.get(i.next());
|
||||||
|
if(msg!=null){
|
||||||
|
if(msg.getClass()==String.class){
|
||||||
|
listener.recoverMessageReference((String) msg);
|
||||||
|
}else{
|
||||||
|
listener.recoverMessage((Message) msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
listener.finished();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
listener.finished();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
|
||||||
|
return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
|
||||||
|
SubscriptionInfo info=new SubscriptionInfo();
|
||||||
|
info.setDestination(destination);
|
||||||
|
info.setClientId(clientId);
|
||||||
|
info.setSelector(selector);
|
||||||
|
info.setSubcriptionName(subscriptionName);
|
||||||
|
String key=getSubscriptionKey(clientId,subscriptionName);
|
||||||
|
// if already exists - won't add it again as it causes data files
|
||||||
|
// to hang around
|
||||||
|
if(!subscriberContainer.containsKey(key)){
|
||||||
|
subscriberContainer.put(key,info);
|
||||||
|
}
|
||||||
|
addSubscriberAckContainer(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
|
||||||
|
int subscriberCount=subscriberAcks.size();
|
||||||
|
if(subscriberCount>0){
|
||||||
|
String id=message.getMessageId().toString();
|
||||||
|
ackContainer.put(id,new AtomicInteger(subscriberCount));
|
||||||
|
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
|
||||||
|
Object key=i.next();
|
||||||
|
ListContainer container=store.getListContainer(key,"durable-subs");
|
||||||
|
container.add(id);
|
||||||
|
}
|
||||||
|
super.addMessage(context,message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
|
||||||
|
final boolean debug = log.isDebugEnabled();
|
||||||
|
|
||||||
|
JournalTopicAck ack = new JournalTopicAck();
|
||||||
|
ack.setDestination(destination);
|
||||||
|
ack.setMessageId(messageId);
|
||||||
|
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
|
||||||
|
ack.setSubscritionName(subscriptionName);
|
||||||
|
ack.setClientId(clientId);
|
||||||
|
ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
|
||||||
|
final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
|
||||||
|
|
||||||
|
final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
|
||||||
|
if( !context.isInTransaction() ) {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
|
||||||
|
acknowledge(messageId, location, key);
|
||||||
|
} else {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
|
||||||
|
synchronized (this) {
|
||||||
|
inFlightTxLocations.add(location);
|
||||||
|
}
|
||||||
|
transactionStore.acknowledge(this, ack, location);
|
||||||
|
context.getTransaction().addSynchronization(new Synchronization(){
|
||||||
|
public void afterCommit() throws Exception {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
|
||||||
|
synchronized (RapidTopicMessageStore.this) {
|
||||||
|
inFlightTxLocations.remove(location);
|
||||||
|
acknowledge(messageId, location, key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void afterRollback() throws Exception {
|
||||||
|
if( debug )
|
||||||
|
log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
|
||||||
|
synchronized (RapidTopicMessageStore.this) {
|
||||||
|
inFlightTxLocations.remove(location);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
|
||||||
|
try {
|
||||||
|
synchronized(this) {
|
||||||
|
String subcriberId=getSubscriptionKey(clientId,subscritionName);
|
||||||
|
String id=messageId.toString();
|
||||||
|
ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
|
||||||
|
if(container!=null){
|
||||||
|
//container.remove(id);
|
||||||
|
container.removeFirst();
|
||||||
|
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
|
||||||
|
if(count!=null){
|
||||||
|
if(count.decrementAndGet()>0){
|
||||||
|
ackContainer.put(id,count);
|
||||||
|
} else {
|
||||||
|
// no more references to message messageContainer so remove it
|
||||||
|
messageContainer.remove(messageId.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable e) {
|
||||||
|
log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param messageId
|
||||||
|
* @param location
|
||||||
|
* @param key
|
||||||
|
*/
|
||||||
|
private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
|
||||||
|
synchronized(this) {
|
||||||
|
lastLocation = location;
|
||||||
|
ackedLastAckLocations.put(key, messageId);
|
||||||
|
|
||||||
|
String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
|
||||||
|
String id=messageId.toString();
|
||||||
|
ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
|
||||||
|
if(container!=null){
|
||||||
|
//container.remove(id);
|
||||||
|
container.removeFirst();
|
||||||
|
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
|
||||||
|
if(count!=null){
|
||||||
|
if(count.decrementAndGet()>0){
|
||||||
|
ackContainer.put(id,count);
|
||||||
|
} else {
|
||||||
|
// no more references to message messageContainer so remove it
|
||||||
|
messageContainer.remove(messageId.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getSubscriptionKey(String clientId,String subscriberName){
|
||||||
|
String result=clientId+":";
|
||||||
|
result+=subscriberName!=null?subscriberName:"NOT_SET";
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public RecordLocation checkpoint() throws IOException {
|
||||||
|
|
||||||
|
ArrayList cpAckedLastAckLocations;
|
||||||
|
|
||||||
|
// swap out the hash maps..
|
||||||
|
synchronized (this) {
|
||||||
|
cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values());
|
||||||
|
this.ackedLastAckLocations = new HashMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
RecordLocation rc = super.checkpoint();
|
||||||
|
if(!cpAckedLastAckLocations.isEmpty()) {
|
||||||
|
Collections.sort(cpAckedLastAckLocations);
|
||||||
|
RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0);
|
||||||
|
if( rc == null || t.compareTo(rc)<0 ) {
|
||||||
|
rc = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
|
||||||
|
String key=getSubscriptionKey(clientId,subscriptionName);
|
||||||
|
subscriberContainer.remove(key);
|
||||||
|
ListContainer list=(ListContainer) subscriberAcks.get(key);
|
||||||
|
for(Iterator i=list.iterator();i.hasNext();){
|
||||||
|
String id=i.next().toString();
|
||||||
|
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
|
||||||
|
if(count!=null){
|
||||||
|
if(count.decrementAndGet()>0){
|
||||||
|
ackContainer.put(id,count);
|
||||||
|
}else{
|
||||||
|
// no more references to message messageContainer so remove it
|
||||||
|
messageContainer.remove(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
|
||||||
|
return (SubscriptionInfo[]) subscriberContainer.values().toArray(
|
||||||
|
new SubscriptionInfo[subscriberContainer.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addSubscriberAckContainer(Object key) throws IOException{
|
||||||
|
ListContainer container=store.getListContainer(key,"topic-subs");
|
||||||
|
Marshaller marshaller=new StringMarshaller();
|
||||||
|
container.setMarshaller(marshaller);
|
||||||
|
subscriberAcks.put(key,container);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,303 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.rapid;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import javax.transaction.xa.XAException;
|
||||||
|
|
||||||
|
import org.apache.activeio.journal.RecordLocation;
|
||||||
|
import org.apache.activemq.command.JournalTopicAck;
|
||||||
|
import org.apache.activemq.command.JournalTransaction;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
|
import org.apache.activemq.command.TransactionId;
|
||||||
|
import org.apache.activemq.command.XATransactionId;
|
||||||
|
import org.apache.activemq.store.TransactionRecoveryListener;
|
||||||
|
import org.apache.activemq.store.TransactionStore;
|
||||||
|
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class RapidTransactionStore implements TransactionStore {
|
||||||
|
|
||||||
|
private final RapidPersistenceAdapter peristenceAdapter;
|
||||||
|
ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
|
||||||
|
ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
|
||||||
|
private boolean doingRecover;
|
||||||
|
|
||||||
|
|
||||||
|
public static class TxOperation {
|
||||||
|
|
||||||
|
static final byte ADD_OPERATION_TYPE = 0;
|
||||||
|
static final byte REMOVE_OPERATION_TYPE = 1;
|
||||||
|
static final byte ACK_OPERATION_TYPE = 3;
|
||||||
|
|
||||||
|
public byte operationType;
|
||||||
|
public RapidMessageStore store;
|
||||||
|
public Object data;
|
||||||
|
public RecordLocation location;
|
||||||
|
|
||||||
|
public TxOperation(byte operationType, RapidMessageStore store, Object data, RecordLocation location) {
|
||||||
|
this.operationType=operationType;
|
||||||
|
this.store=store;
|
||||||
|
this.data=data;
|
||||||
|
this.location = location;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Operations
|
||||||
|
* @version $Revision: 1.6 $
|
||||||
|
*/
|
||||||
|
public static class Tx {
|
||||||
|
|
||||||
|
private final RecordLocation location;
|
||||||
|
private ArrayList operations = new ArrayList();
|
||||||
|
|
||||||
|
public Tx(RecordLocation location) {
|
||||||
|
this.location=location;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(RapidMessageStore store, Message msg, RecordLocation loc) {
|
||||||
|
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(RapidMessageStore store, MessageAck ack, RecordLocation loc) {
|
||||||
|
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, loc));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation loc) {
|
||||||
|
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Message[] getMessages() {
|
||||||
|
ArrayList list = new ArrayList();
|
||||||
|
for (Iterator iter = operations.iterator(); iter.hasNext();) {
|
||||||
|
TxOperation op = (TxOperation) iter.next();
|
||||||
|
if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
|
||||||
|
list.add(op.data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Message rc[] = new Message[list.size()];
|
||||||
|
list.toArray(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageAck[] getAcks() {
|
||||||
|
ArrayList list = new ArrayList();
|
||||||
|
for (Iterator iter = operations.iterator(); iter.hasNext();) {
|
||||||
|
TxOperation op = (TxOperation) iter.next();
|
||||||
|
if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
|
||||||
|
list.add(op.data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageAck rc[] = new MessageAck[list.size()];
|
||||||
|
list.toArray(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayList getOperations() {
|
||||||
|
return operations;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public RapidTransactionStore(RapidPersistenceAdapter adapter) {
|
||||||
|
this.peristenceAdapter = adapter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IOException
|
||||||
|
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
|
||||||
|
*/
|
||||||
|
public void prepare(TransactionId txid) throws IOException {
|
||||||
|
Tx tx = (Tx) inflightTransactions.remove(txid);
|
||||||
|
if (tx == null)
|
||||||
|
return;
|
||||||
|
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
|
||||||
|
preparedTransactions.put(txid, tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IOException
|
||||||
|
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
|
||||||
|
*/
|
||||||
|
public void replayPrepare(TransactionId txid) throws IOException {
|
||||||
|
Tx tx = (Tx) inflightTransactions.remove(txid);
|
||||||
|
if (tx == null)
|
||||||
|
return;
|
||||||
|
preparedTransactions.put(txid, tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Tx getTx(Object txid, RecordLocation location) {
|
||||||
|
Tx tx = (Tx) inflightTransactions.get(txid);
|
||||||
|
if (tx == null) {
|
||||||
|
tx = new Tx(location);
|
||||||
|
inflightTransactions.put(txid, tx);
|
||||||
|
}
|
||||||
|
return tx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws XAException
|
||||||
|
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
|
||||||
|
*/
|
||||||
|
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
|
||||||
|
Tx tx;
|
||||||
|
if (wasPrepared) {
|
||||||
|
tx = (Tx) preparedTransactions.remove(txid);
|
||||||
|
} else {
|
||||||
|
tx = (Tx) inflightTransactions.remove(txid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tx == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (txid.isXATransaction()) {
|
||||||
|
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared),
|
||||||
|
true);
|
||||||
|
} else {
|
||||||
|
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared),
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws XAException
|
||||||
|
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
|
||||||
|
*/
|
||||||
|
public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
|
||||||
|
if (wasPrepared) {
|
||||||
|
return (Tx) preparedTransactions.remove(txid);
|
||||||
|
} else {
|
||||||
|
return (Tx) inflightTransactions.remove(txid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IOException
|
||||||
|
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
|
||||||
|
*/
|
||||||
|
public void rollback(TransactionId txid) throws IOException {
|
||||||
|
|
||||||
|
Tx tx = (Tx) inflightTransactions.remove(txid);
|
||||||
|
if (tx != null)
|
||||||
|
tx = (Tx) preparedTransactions.remove(txid);
|
||||||
|
|
||||||
|
if (tx != null) {
|
||||||
|
if (txid.isXATransaction()) {
|
||||||
|
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false),
|
||||||
|
true);
|
||||||
|
} else {
|
||||||
|
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false),
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IOException
|
||||||
|
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
|
||||||
|
*/
|
||||||
|
public void replayRollback(TransactionId txid) throws IOException {
|
||||||
|
if (inflightTransactions.remove(txid) != null)
|
||||||
|
preparedTransactions.remove(txid);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
|
||||||
|
// All the in-flight transactions get rolled back..
|
||||||
|
inflightTransactions.clear();
|
||||||
|
this.doingRecover = true;
|
||||||
|
try {
|
||||||
|
for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
|
||||||
|
Object txid = (Object) iter.next();
|
||||||
|
Tx tx = (Tx) preparedTransactions.get(txid);
|
||||||
|
listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.doingRecover = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param message
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void addMessage(RapidMessageStore store, Message message, RecordLocation location) throws IOException {
|
||||||
|
Tx tx = getTx(message.getTransactionId(), location);
|
||||||
|
tx.add(store, message, location);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param ack
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void removeMessage(RapidMessageStore store, MessageAck ack, RecordLocation location) throws IOException {
|
||||||
|
Tx tx = getTx(ack.getTransactionId(), location);
|
||||||
|
tx.add(store, ack, location);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
|
||||||
|
Tx tx = getTx(ack.getTransactionId(), location);
|
||||||
|
tx.add(store, ack, location);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public RecordLocation checkpoint() throws IOException {
|
||||||
|
|
||||||
|
// Nothing really to checkpoint.. since, we don't
|
||||||
|
// checkpoint tx operations in to long term store until they are committed.
|
||||||
|
|
||||||
|
// But we keep track of the first location of an operation
|
||||||
|
// that was associated with an active tx. The journal can not
|
||||||
|
// roll over active tx records.
|
||||||
|
RecordLocation rc = null;
|
||||||
|
for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
|
||||||
|
Tx tx = (Tx) iter.next();
|
||||||
|
RecordLocation location = tx.location;
|
||||||
|
if (rc == null || rc.compareTo(location) < 0) {
|
||||||
|
rc = location;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
|
||||||
|
Tx tx = (Tx) iter.next();
|
||||||
|
RecordLocation location = tx.location;
|
||||||
|
if (rc == null || rc.compareTo(location) < 0) {
|
||||||
|
rc = location;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDoingRecover() {
|
||||||
|
return doingRecover;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue