Quick store bug fixes.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@492511 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-01-04 10:39:04 +00:00
parent 62c850b7de
commit ced50c9940
5 changed files with 36 additions and 31 deletions

View File

@ -54,6 +54,11 @@ public interface ReferenceStore extends MessageStore {
public void setOffset(int offset) {
this.offset = offset;
}
@Override
public String toString() {
return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration;
}
}
/**

View File

@ -95,7 +95,7 @@ public class KahaTopicReferenceStore extends KahaTopicMessageStore implements To
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
if(msg.messageId.equals(identity)){
if(msg.messageId.equals(identity.toString())){
result=msg;
cache.put(identity,entry);
break;

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -167,6 +168,8 @@ public class QuickMessageStore implements MessageStore {
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
referenceStore.addMessageReference(context, id, data);
System.out.println("referenceStore.put "+id+"-->"+data);
}
}
catch (Throwable e) {
@ -332,6 +335,7 @@ public class QuickMessageStore implements MessageStore {
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
System.out.println("referenceStore.put "+entry.getKey()+"-->"+entry.getValue());
} catch (Throwable e) {
log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
@ -394,21 +398,23 @@ public class QuickMessageStore implements MessageStore {
if( data==null ) {
data = referenceStore.getMessageReference(identity);
System.out.println("referenceStore.get "+identity+"-->"+data);
if( data==null ) {
return null;
}
}
if( data==null ) {
return null;
}
Message answer = null;
if (answer != null ) {
return answer;
}
Location location = new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
return (Message) peristenceAdapter.readCommand(location);
DataStructure rc = peristenceAdapter.readCommand(location);
try {
return (Message) rc;
} catch (ClassCastException e) {
throw new IOException("Could not read message "+identity+" at location "+location+", expected a message, but got: "+rc);
}
}
/**

View File

@ -88,7 +88,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
private UsageManager usageManager;
private long cleanupInterval = 1000 * 10;
private long cleanupInterval = 1000 * 1/10;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1;
@ -147,15 +147,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
referenceStoreAdapter.start();
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
for (Integer fileId : files) {
try {
asyncDataManager.addInterestInFile(fileId);
} catch (IOException e) {
// We can expect these since referenceStoreAdapter is a litle behind in updates
// and it might think it has references to data files that have allready come and gone..
// This should get resolved once recovery kicks in.
}
}
log.info("Active data files: "+files);
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
public boolean iterate() {
@ -172,8 +164,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
public void run() {
checkpoint(false);
}
};
};
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
periodicCleanupTask = new Runnable() {
@ -193,6 +184,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask);
Iterator<QuickMessageStore> iterator = queues.values().iterator();
@ -499,7 +491,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
op.store.replayAddMessage(context, (Message)op.data, pos);
op.store.replayAddMessage(context, (Message)op.data, op.location);
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
op.store.replayRemoveMessage(context, (MessageAck) op.data);

View File

@ -56,11 +56,13 @@ public class QuickTransactionStore implements TransactionStore {
public byte operationType;
public QuickMessageStore store;
public Object data;
public Location location;
public TxOperation(byte operationType, QuickMessageStore store, Object data) {
public TxOperation(byte operationType, QuickMessageStore store, Object data, Location location) {
this.operationType=operationType;
this.store=store;
this.data=data;
this.location=location;
}
}
@ -77,16 +79,16 @@ public class QuickTransactionStore implements TransactionStore {
this.location=location;
}
public void add(QuickMessageStore store, Message msg) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
public void add(QuickMessageStore store, Message msg, Location location) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
}
public void add(QuickMessageStore store, MessageAck ack) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
}
public void add(QuickTopicMessageStore store, JournalTopicAck ack) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
}
public Message[] getMessages() {
@ -283,7 +285,7 @@ public class QuickTransactionStore implements TransactionStore {
*/
void addMessage(QuickMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message);
tx.add(store, message, location);
}
/**