switch from using the RecordLocation interface to the Location interface since the adapter will need to

be aware of what log file the active records are in so that it can delete un-used log files.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@476310 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-17 20:53:42 +00:00
parent ab66037b12
commit 97d0a61057
5 changed files with 51 additions and 58 deletions

View File

@ -18,30 +18,24 @@
package org.apache.activemq.store.rapid;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
public class RapidMessageReference {
public final MessageId messageId;
public final long expiration;
public final RecordLocation location;
public final Location location;
public RapidMessageReference(Message message, RecordLocation location) {
public RapidMessageReference(Message message, Location location) {
this.messageId = message.getMessageId();
this.expiration = message.getExpiration();
this.location=location;
}
public long getExpiration() {
return expiration;
}
public MessageId getMessageId() {
return messageId;
}
public RecordLocation getLocation() {
public Location getLocation() {
return location;
}
}

View File

@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -61,7 +60,7 @@ public class RapidMessageStore implements MessageStore {
// /** A MessageStore that we can use to retrieve messages quickly. */
// private LinkedHashMap cpAddedMessageIds;
protected RecordLocation lastLocation;
protected Location lastLocation;
protected HashSet inFlightTxLocations = new HashSet();
public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) {
@ -82,7 +81,7 @@ public class RapidMessageStore implements MessageStore {
final MessageId id = message.getMessageId();
final boolean debug = log.isDebugEnabled();
final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
final RapidMessageReference md = new RapidMessageReference(message, location);
if( !context.isInTransaction() ) {
@ -127,19 +126,19 @@ public class RapidMessageStore implements MessageStore {
}
}
static protected String toString(RecordLocation location) {
static protected String toString(Location location) {
Location l = (Location) location;
return l.getLogFileId()+":"+l.getLogFileOffset();
}
static protected RecordLocation toRecordLocation(String t) {
static protected Location toLocation(String t) {
String[] strings = t.split(":");
if( strings.length!=2 )
throw new IllegalArgumentException("Invalid location: "+t);
return new Location(Integer.parseInt(strings[0]),Integer.parseInt(strings[1]));
}
public void replayAddMessage(ConnectionContext context, Message message, RecordLocation location) {
public void replayAddMessage(ConnectionContext context, Message message, Location location) {
try {
RapidMessageReference messageReference = new RapidMessageReference(message, location);
messageContainer.put(message.getMessageId().toString(), messageReference);
@ -157,7 +156,7 @@ public class RapidMessageStore implements MessageStore {
remove.setDestination(destination);
remove.setMessageAck(ack);
final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
if( !context.isInTransaction() ) {
if( debug )
log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
@ -190,7 +189,7 @@ public class RapidMessageStore implements MessageStore {
}
}
private void removeMessage(final MessageAck ack, final RecordLocation location) {
private void removeMessage(final MessageAck ack, final Location location) {
synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
@ -270,7 +269,7 @@ public class RapidMessageStore implements MessageStore {
* @return
* @throws IOException
*/
public RecordLocation checkpoint() throws IOException {
public Location checkpoint() throws IOException {
ArrayList cpActiveJournalLocations;
@ -281,7 +280,7 @@ public class RapidMessageStore implements MessageStore {
if( cpActiveJournalLocations.size() > 0 ) {
Collections.sort(cpActiveJournalLocations);
return (RecordLocation) cpActiveJournalLocations.get(0);
return (Location) cpActiveJournalLocations.get(0);
} else {
return lastLocation;
}

View File

@ -23,12 +23,22 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.journal.active.Location;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
@ -67,16 +77,6 @@ import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
@ -471,14 +471,14 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
*/
private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
RecordLocation pos = null;
Location pos = null;
int transactionCounter = 0;
log.info("Journal Recovery Started.");
ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
while ((pos = (Location) journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
@ -603,9 +603,9 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
* @return
* @throws IOException
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
public Location writeCommand(DataStructure command, boolean sync) throws IOException {
if( started.get() )
return journal.write(toPacket(wireFormat.marshal(command)), sync);
return (Location) journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}

View File

@ -22,7 +22,8 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
@ -170,7 +171,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
final RecordLocation location=peristenceAdapter.writeCommand(ack,false);
final Location location=peristenceAdapter.writeCommand(ack,false);
final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
if(!context.isInTransaction()){
if(debug)
@ -236,7 +237,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
* @param location
* @param key
*/
private void acknowledge(MessageId messageId,RecordLocation location,SubscriptionKey key){
private void acknowledge(MessageId messageId,Location location,SubscriptionKey key){
synchronized(this){
lastLocation=location;
ackedLastAckLocations.put(key,messageId);
@ -265,17 +266,17 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
return result;
}
public RecordLocation checkpoint() throws IOException{
public Location checkpoint() throws IOException{
ArrayList cpAckedLastAckLocations;
// swap out the hash maps..
synchronized(this){
cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values());
this.ackedLastAckLocations=new HashMap();
}
RecordLocation rc=super.checkpoint();
Location rc=super.checkpoint();
if(!cpAckedLastAckLocations.isEmpty()){
Collections.sort(cpAckedLastAckLocations);
RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0);
Location t=(Location)cpAckedLastAckLocations.get(0);
if(rc==null||t.compareTo(rc)<0){
rc=t;
}

View File

@ -20,10 +20,11 @@ package org.apache.activemq.store.rapid;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.xa.XAException;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
@ -33,8 +34,6 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class RapidTransactionStore implements TransactionStore {
@ -54,9 +53,9 @@ public class RapidTransactionStore implements TransactionStore {
public byte operationType;
public RapidMessageStore store;
public Object data;
public RecordLocation location;
public Location location;
public TxOperation(byte operationType, RapidMessageStore store, Object data, RecordLocation location) {
public TxOperation(byte operationType, RapidMessageStore store, Object data, Location location) {
this.operationType=operationType;
this.store=store;
this.data=data;
@ -70,22 +69,22 @@ public class RapidTransactionStore implements TransactionStore {
*/
public static class Tx {
private final RecordLocation location;
private final Location location;
private ArrayList operations = new ArrayList();
public Tx(RecordLocation location) {
public Tx(Location location) {
this.location=location;
}
public void add(RapidMessageStore store, Message msg, RecordLocation loc) {
public void add(RapidMessageStore store, Message msg, Location loc) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, loc));
}
public void add(RapidMessageStore store, MessageAck ack, RecordLocation loc) {
public void add(RapidMessageStore store, MessageAck ack, Location loc) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, loc));
}
public void add(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation loc) {
public void add(RapidTopicMessageStore store, JournalTopicAck ack, Location loc) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, loc));
}
@ -148,7 +147,7 @@ public class RapidTransactionStore implements TransactionStore {
preparedTransactions.put(txid, tx);
}
public Tx getTx(Object txid, RecordLocation location) {
public Tx getTx(Object txid, Location location) {
Tx tx = (Tx) inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx(location);
@ -249,7 +248,7 @@ public class RapidTransactionStore implements TransactionStore {
* @param message
* @throws IOException
*/
void addMessage(RapidMessageStore store, Message message, RecordLocation location) throws IOException {
void addMessage(RapidMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message, location);
}
@ -258,19 +257,19 @@ public class RapidTransactionStore implements TransactionStore {
* @param ack
* @throws IOException
*/
public void removeMessage(RapidMessageStore store, MessageAck ack, RecordLocation location) throws IOException {
public void removeMessage(RapidMessageStore store, MessageAck ack, Location location) throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack, location);
}
public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
public void acknowledge(RapidTopicMessageStore store, JournalTopicAck ack, Location location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack, location);
}
public RecordLocation checkpoint() throws IOException {
public Location checkpoint() throws IOException {
// Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are committed.
@ -278,17 +277,17 @@ public class RapidTransactionStore implements TransactionStore {
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
RecordLocation rc = null;
Location rc = null;
for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
Tx tx = (Tx) iter.next();
RecordLocation location = tx.location;
Location location = tx.location;
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
}
}
for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
Tx tx = (Tx) iter.next();
RecordLocation location = tx.location;
Location location = tx.location;
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
}