Quick Store bug fixes and added a bench mark test

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@492517 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-01-04 11:00:31 +00:00
parent ced50c9940
commit 88eab92328
3 changed files with 29 additions and 21 deletions

View File

@ -157,7 +157,7 @@ public class QuickMessageStore implements MessageStore {
}
}
public void replayAddMessage(ConnectionContext context, Message message, Location location) {
public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
MessageId id = message.getMessageId();
try {
// Only add the message if it has not already been added.
@ -168,13 +168,13 @@ public class QuickMessageStore implements MessageStore {
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
referenceStore.addMessageReference(context, id, data);
System.out.println("referenceStore.put "+id+"-->"+data);
return true;
}
}
catch (Throwable e) {
log.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e,e);
}
return false;
}
/**
@ -238,17 +238,19 @@ public class QuickMessageStore implements MessageStore {
}
}
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
if( t!=null ) {
referenceStore.removeMessage(context, messageAck);
return true;
}
}
catch (Throwable e) {
log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
/**
@ -335,7 +337,6 @@ public class QuickMessageStore implements MessageStore {
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
System.out.println("referenceStore.put "+entry.getKey()+"-->"+entry.getValue());
} catch (Throwable e) {
log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
@ -398,7 +399,6 @@ public class QuickMessageStore implements MessageStore {
if( data==null ) {
data = referenceStore.getMessageReference(identity);
System.out.println("referenceStore.get "+identity+"-->"+data);
if( data==null ) {
return null;
}

View File

@ -422,9 +422,10 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
private void recover() throws IllegalStateException, IOException {
Location pos = null;
int transactionCounter = 0;
int redoCounter = 0;
log.info("Journal Recovery Started from: " + asyncDataManager);
long start = System.currentTimeMillis();
ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
@ -439,8 +440,9 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
transactionStore.addMessage(store, message, pos);
}
else {
store.replayAddMessage(context, message, pos);
transactionCounter++;
if( store.replayAddMessage(context, message, pos) ) {
redoCounter++;
}
}
} else {
switch (c.getDataStructureType()) {
@ -452,8 +454,9 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
transactionStore.removeMessage(store, command.getMessageAck(), pos);
}
else {
store.replayRemoveMessage(context, command.getMessageAck());
transactionCounter++;
if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
redoCounter++;
}
}
}
break;
@ -465,8 +468,9 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
transactionStore.acknowledge(store, command, pos);
}
else {
store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
transactionCounter++;
if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
redoCounter++;
}
}
}
break;
@ -491,18 +495,20 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
op.store.replayAddMessage(context, (Message)op.data, op.location);
if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
redoCounter++;
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
op.store.replayRemoveMessage(context, (MessageAck) op.data);
if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
redoCounter++;
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
JournalTopicAck ack = (JournalTopicAck) op.data;
((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
.getMessageId());
if( ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
redoCounter++;
}
}
}
transactionCounter++;
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
@ -524,11 +530,11 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
}
}
}
Location location = writeTraceMessage("RECOVERED "+new Date(), true);
asyncDataManager.setMark(location, true);
long end = System.currentTimeMillis();
log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
}
private IOException createReadException(Location location, Exception e) {

View File

@ -120,16 +120,18 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe
}
public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
try {
SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
if( sub != null ) {
topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
return true;
}
}
catch (Throwable e) {
log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}