revisit https://issues.apache.org/jira/browse/AMQ-3519 with more durable solution https://issues.apache.org/jira/browse/AMQ-5068 - JMSRedelivered header now persisted before dispatch so that it is a reliable indication of a possible duplicate delivery. The option is enabled via destination policy persistJMSRedelivered

This commit is contained in:
gtully 2014-03-26 11:14:35 +00:00
parent 751fc2363c
commit 266d23ef79
25 changed files with 314 additions and 189 deletions

View File

@ -106,6 +106,7 @@ public abstract class BaseDestination implements Destination {
* percentage of in-flight messages above which optimize message store is disabled * percentage of in-flight messages above which optimize message store is disabled
*/ */
private int optimizeMessageStoreInFlightLimit = 10; private int optimizeMessageStoreInFlightLimit = 10;
private boolean persistJMSRedelivered;
/** /**
* @param brokerService * @param brokerService
@ -807,4 +808,11 @@ public abstract class BaseDestination implements Destination {
} }
} }
public void setPersistJMSRedelivered(boolean persistJMSRedelivered) {
this.persistJMSRedelivered = persistJMSRedelivered;
}
public boolean isPersistJMSRedelivered() {
return persistJMSRedelivered;
}
} }

View File

@ -25,6 +25,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;

View File

@ -596,6 +596,17 @@ public class RegionBroker extends EmptyBroker {
long totalTime = endTime - message.getBrokerInTime(); long totalTime = endTime - message.getBrokerInTime();
((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
} }
if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
final int originalValue = message.getRedeliveryCounter();
message.incrementRedeliveryCounter();
try {
((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
} catch (IOException error) {
LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
} finally {
message.setRedeliveryCounter(originalValue);
}
}
} }
} }

View File

@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry {
* percentage of in-flight messages above which optimize message store is disabled * percentage of in-flight messages above which optimize message store is disabled
*/ */
private int optimizeMessageStoreInFlightLimit = 10; private int optimizeMessageStoreInFlightLimit = 10;
private boolean persistJMSRedelivered = false;
public void configure(Broker broker,Queue queue) { public void configure(Broker broker,Queue queue) {
@ -196,6 +197,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
destination.setPersistJMSRedelivered(isPersistJMSRedelivered());
} }
public void baseConfiguration(Broker broker, BaseDestination destination) { public void baseConfiguration(Broker broker, BaseDestination destination) {
@ -920,4 +922,11 @@ public class PolicyEntry extends DestinationMapEntry {
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
} }
public void setPersistJMSRedelivered(boolean val) {
this.persistJMSRedelivered = val;
}
public boolean isPersistJMSRedelivered() {
return persistJMSRedelivered;
}
} }

View File

@ -117,6 +117,10 @@ abstract public class AbstractMessageStore implements MessageStore {
removeMessage(context, ack); removeMessage(context, ack);
} }
public void updateMessage(Message message) throws IOException {
throw new IOException("update is not supported by: " + this);
}
static class CallableImplementation implements Callable<Object> { static class CallableImplementation implements Callable<Object> {
public Object call() throws Exception { public Object call() throws Exception {
return null; return null;

View File

@ -195,4 +195,5 @@ public interface MessageStore extends Service {
*/ */
public boolean isPrioritizedMessages(); public boolean isPrioritizedMessages();
void updateMessage(Message message) throws IOException;
} }

View File

@ -155,4 +155,9 @@ public class ProxyMessageStore implements MessageStore {
public boolean isPrioritizedMessages() { public boolean isPrioritizedMessages() {
return delegate.isPrioritizedMessages(); return delegate.isPrioritizedMessages();
} }
@Override
public void updateMessage(Message message) throws IOException {
delegate.updateMessage(message);
}
} }

View File

@ -204,4 +204,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public boolean isPrioritizedMessages() { public boolean isPrioritizedMessages() {
return delegate.isPrioritizedMessages(); return delegate.isPrioritizedMessages();
} }
public void updateMessage(Message message) throws IOException {
delegate.updateMessage(message);
}
} }

View File

@ -153,4 +153,10 @@ public class MemoryMessageStore extends AbstractMessageStore {
lastBatchId = messageId; lastBatchId = messageId;
} }
public void updateMessage(Message message) {
synchronized (messageTable) {
messageTable.put(message.getMessageId(), message);
}
}
} }

View File

@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.Set; import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
@ -107,4 +108,6 @@ public interface JDBCAdapter {
void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException; void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException;
void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException; void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException;
void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException;
} }

View File

@ -138,6 +138,19 @@ public class JDBCMessageStore extends AbstractMessageStore {
} }
} }
@Override
public void updateMessage(Message message) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message)));
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e);
} finally {
c.close();
}
}
protected void onAdd(MessageId messageId, long sequenceId, byte priority) { protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) { if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) {
recoveredAdditions.add(sequenceId); recoveredAdditions.add(sequenceId);

View File

@ -169,7 +169,7 @@ public class Statements {
public String getUpdateMessageStatement() { public String getUpdateMessageStatement() {
if (updateMessageStatement == null) { if (updateMessageStatement == null) {
updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE ID=?"; updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
} }
return updateMessageStatement; return updateMessageStatement;
} }

View File

@ -252,6 +252,24 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
@Override
public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
setBinaryData(s, 1, data);
s.setString(2, id.getProducerId().toString());
s.setLong(3, id.getProducerSequenceId());
s.setString(4, destination.getQualifiedName());
if (s.executeUpdate() != 1) {
throw new IOException("Could not update message: " + id + " in " + destination);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,

View File

@ -555,18 +555,6 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setArchiveCorruptedIndex(archiveCorruptedIndex); letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
} }
/**
* When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
* used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true
*/
public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
letter.setRewriteOnRedelivery(rewriteOnRedelivery);
}
public boolean isRewriteOnRedelivery() {
return letter.isRewriteOnRedelivery();
}
public float getIndexLFUEvictionFactor() { public float getIndexLFUEvictionFactor() {
return letter.getIndexLFUEvictionFactor(); return letter.getIndexLFUEvictionFactor();
} }

View File

@ -60,10 +60,12 @@ import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
@ -276,46 +278,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
super.doStop(stopper); super.doStop(stopper);
} }
@Override
void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
Location location;
this.indexLock.writeLock().lock();
try {
location = findMessageLocation(key, destination);
} finally {
this.indexLock.writeLock().unlock();
}
if (location != null) {
KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
message.incrementRedeliveryCounter();
if (LOG.isTraceEnabled()) {
LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
}
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(destination, tx);
Long sequence = sd.messageIdIndex.get(tx, key);
MessageKeys keys = sd.orderIndex.get(tx, sequence);
sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
}
});
} finally {
this.indexLock.writeLock().unlock();
}
}
}
@Override @Override
void rollbackStatsOnDuplicate(KahaDestination commandDestination) { void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
if (brokerService != null) { if (brokerService != null) {
@ -465,6 +427,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
public void updateMessage(Message message) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
}
KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toProducerKey());
command.setPriority(message.getPriority());
command.setPrioritySupported(prioritizedMessages);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
updateMessageCommand.setMessage(command);
store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
}
@Override @Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
@ -1126,7 +1104,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
* @throws IOException * @throws IOException
*/ */
Message loadMessage(Location location) throws IOException { Message loadMessage(Location location) throws IOException {
KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); JournalCommand<?> command = load(location);
KahaAddMessageCommand addMessage = null;
switch (command.type()) {
case KAHA_UPDATE_MESSAGE_COMMAND:
addMessage = ((KahaUpdateMessageCommand)command).getMessage();
break;
default:
addMessage = (KahaAddMessageCommand) command;
}
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
return msg; return msg;
} }

View File

@ -69,6 +69,7 @@ import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
import org.apache.activemq.store.kahadb.disk.index.ListIndex; import org.apache.activemq.store.kahadb.disk.index.ListIndex;
@ -1113,6 +1114,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void visit(KahaTraceCommand command) { public void visit(KahaTraceCommand command) {
processLocation(location); processLocation(location);
} }
@Override
public void visit(KahaUpdateMessageCommand command) throws IOException {
process(command, location);
}
}); });
} }
@ -1127,7 +1133,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override @Override
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location); updateIndex(tx, command, location);
} }
}); });
} finally { } finally {
@ -1136,6 +1142,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
@SuppressWarnings("rawtypes")
protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
});
} finally {
this.indexLock.writeLock().unlock();
}
}
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) { if (command.hasTransactionInfo()) {
@ -1253,26 +1274,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
updates = preparedTransactions.remove(key); updates = preparedTransactions.remove(key);
} }
} }
if (isRewriteOnRedelivery()) {
persistRedeliveryCount(updates);
} }
}
@SuppressWarnings("rawtypes")
private void persistRedeliveryCount(List<Operation> updates) throws IOException {
if (updates != null) {
for (Operation operation : updates) {
operation.getCommand().visit(new Visitor() {
@Override
public void visit(KahaRemoveMessageCommand command) throws IOException {
incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
}
});
}
}
}
abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
// ///////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////
// These methods do the actual index updates. // These methods do the actual index updates.
@ -1281,7 +1283,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { void updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx); StoredDestination sd = getStoredDestination(command.getDestination(), tx);
// Skip adding the message to the index if this is a topic and there are // Skip adding the message to the index if this is a topic and there are
@ -1320,6 +1322,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.lastUpdate = location; metadata.lastUpdate = location;
} }
void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
KahaAddMessageCommand command = updateMessageCommand.getMessage();
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
Long id = sd.messageIdIndex.get(tx, command.getMessageId());
if (id != null) {
sd.orderIndex.put(
tx,
command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
id,
new MessageKeys(command.getMessageId(), location)
);
sd.locationIndex.put(tx, location, id);
} else {
LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
}
metadata.lastUpdate = location;
}
abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination); abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
@ -2382,7 +2403,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@Override @Override
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location); updateIndex(tx, command, location);
} }
} }
@ -2612,14 +2633,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.directoryArchive = directoryArchive; this.directoryArchive = directoryArchive;
} }
public boolean isRewriteOnRedelivery() {
return rewriteOnRedelivery;
}
public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
this.rewriteOnRedelivery = rewriteOnRedelivery;
}
public boolean isArchiveCorruptedIndex() { public boolean isArchiveCorruptedIndex() {
return archiveCorruptedIndex; return archiveCorruptedIndex;
} }

View File

@ -28,6 +28,7 @@ import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
public class Visitor { public class Visitor {
@ -60,4 +61,7 @@ public class Visitor {
public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException { public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
} }
public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
}
} }

View File

@ -31,6 +31,7 @@ enum KahaEntryType {
KAHA_SUBSCRIPTION_COMMAND = 7; KAHA_SUBSCRIPTION_COMMAND = 7;
KAHA_PRODUCER_AUDIT_COMMAND = 8; KAHA_PRODUCER_AUDIT_COMMAND = 8;
KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9; KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9;
KAHA_UPDATE_MESSAGE_COMMAND = 10;
} }
message KahaTraceCommand { message KahaTraceCommand {
@ -58,6 +59,13 @@ message KahaAddMessageCommand {
optional bool prioritySupported = 6; optional bool prioritySupported = 6;
} }
message KahaUpdateMessageCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaUpdateMessageCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
//| option java_type_method = "KahaEntryType";
required KahaAddMessageCommand message = 1;
}
message KahaRemoveMessageCommand { message KahaRemoveMessageCommand {
//| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveMessageCommand>"; //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveMessageCommand>";
//| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";

View File

@ -394,7 +394,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
if( prepared ) { if( prepared ) {
store.preparedAcks.remove(ack.getLastMessageId) store.preparedAcks.remove(ack.getLastMessageId)
} }
uow.incrementRedelivery(store.key, ack.getLastMessageId)
} }
} }
} }
@ -701,6 +700,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
waitOn(asyncAddQueueMessage(context, message, delay)) waitOn(asyncAddQueueMessage(context, message, delay))
} }
override def updateMessage(message: Message): Unit = {
check_running
// the only current usage of update is to increment the redelivery counter
withUow {uow => uow.incrementRedelivery(key, message.getMessageId)}
}
def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture[AnyRef] = { def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture[AnyRef] = {
uow.dequeue(key, id) uow.dequeue(key, id)
} }

View File

@ -199,6 +199,22 @@ public abstract class TestSupport extends CombinationTestSupport {
return adapter; return adapter;
} }
public void stopBrokerWithStoreFailure(BrokerService broker, PersistenceAdapterChoice choice) throws Exception {
switch (choice) {
case KahaDB:
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
// have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
kahaDBPersistenceAdapter.getStore().getJournal().close();
break;
default:
// just stop normally by default
broker.stop();
}
broker.waitUntilStopped();
}
/** /**
* Test if base directory contains spaces * Test if base directory contains spaces
*/ */

View File

@ -59,6 +59,7 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
*/ */
protected void restartBroker() throws Exception { protected void restartBroker() throws Exception {
broker.stop(); broker.stop();
broker.waitUntilStopped();
broker = createRestartedBroker(); broker = createRestartedBroker();
broker.start(); broker.start();
} }

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import junit.framework.Test;
import org.apache.activemq.leveldb.LevelDBStore;
import java.io.IOException;
/**
*/
public class LevelDBRedeliveryRestartTest extends RedeliveryRestartTest {
@Override
protected void configureBroker(BrokerService broker) throws Exception {
broker.setDestinationPolicy(policyMap);
LevelDBStore store = new LevelDBStore();
broker.setPersistenceAdapter(store);
broker.addConnector("tcp://0.0.0.0:0");
}
@Override
protected void stopBrokerWithStoreFailure() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public static Test suite() {
return suite(LevelDBRedeliveryRestartTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.util.Arrays;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -23,44 +24,123 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.transport.failover.FailoverTransport; import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; @RunWith(value = Parameterized.class)
public class RedeliveryRestartTest extends TestSupport {
public class RedeliveryRestartTest extends BrokerRestartTestSupport {
private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class); private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);
ActiveMQConnection connection;
BrokerService broker = null;
String queueName = "redeliveryRestartQ";
@Override @Parameterized.Parameter
protected void setUp() throws Exception { public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = PersistenceAdapterChoice.KahaDB;
setAutoFail(true);
setMaxTestTime(2 * 60 * 1000);
super.setUp();
@Parameterized.Parameters(name="Store={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB},{TestSupport.PersistenceAdapterChoice.JDBC},{TestSupport.PersistenceAdapterChoice.LevelDB}});
} }
@Override @Override
@Before
public void setUp() throws Exception {
super.setUp();
broker = new BrokerService();
configureBroker(broker);
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
}
@Override
@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
broker.stop();
super.tearDown();
}
protected void configureBroker(BrokerService broker) throws Exception { protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker); PolicyMap policyMap = new PolicyMap();
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); PolicyEntry policy = new PolicyEntry();
kahaDBPersistenceAdapter.setRewriteOnRedelivery(true); policy.setPersistJMSRedelivered(true);
kahaDBPersistenceAdapter.setCleanupInterval(500); policyMap.setDefaultEntry(policy);
broker.setDestinationPolicy(policyMap);
setPersistenceAdapter(broker, persistenceAdapterChoice);
broker.addConnector("tcp://0.0.0.0:0"); broker.addConnector("tcp://0.0.0.0:0");
} }
@org.junit.Test
public void testValidateRedeliveryFlagAfterRestartNoTx() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString()
+ ")?jms.prefetchPolicy.all=0");
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
populateDestination(10, queueName, connection);
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = null;
for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(20000);
LOG.info("not redelivered? got: " + msg);
assertNotNull("got the message", msg);
assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
assertEquals("not a redelivery", false, msg.getJMSRedelivered());
}
consumer.close();
restartBroker();
// make failover aware of the restarted auto assigned port
connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0)
.getPublishableConnectString());
consumer = session.createConsumer(destination);
for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(4000);
LOG.info("redelivered? got: " + msg);
assertNotNull("got the message again", msg);
assertEquals("re delivery flag", true, msg.getJMSRedelivered());
assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount"));
msg.acknowledge();
}
// consume the rest that were not redeliveries
for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(20000);
LOG.info("not redelivered? got: " + msg);
assertNotNull("got the message", msg);
assertEquals("not a redelivery", false, msg.getJMSRedelivered());
assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
msg.acknowledge();
}
connection.close();
}
@org.junit.Test
public void testValidateRedeliveryFlagAfterRestart() throws Exception { public void testValidateRedeliveryFlagAfterRestart() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString()
+ ")?jms.transactedIndividualAck=true"); + ")?jms.prefetchPolicy.all=0");
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start(); connection.start();
populateDestination(10, queueName, connection); populateDestination(10, queueName, connection);
@ -109,10 +189,11 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
connection.close(); connection.close();
} }
@org.junit.Test
public void testValidateRedeliveryFlagAfterRecovery() throws Exception { public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+ "?jms.transactedIndividualAck=true"); + "?jms.prefetchPolicy.all=0");
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start(); connection.start();
populateDestination(1, queueName, connection); populateDestination(1, queueName, connection);
@ -121,19 +202,20 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
Destination destination = session.createQueue(queueName); Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(20000); TextMessage msg = (TextMessage) consumer.receive(5000);
LOG.info("got: " + msg); LOG.info("got: " + msg);
assertNotNull("got the message", msg); assertNotNull("got the message", msg);
assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
assertEquals("not a redelivery", false, msg.getJMSRedelivered()); assertEquals("not a redelivery", false, msg.getJMSRedelivered());
stopBrokerWithStoreFailure(); stopBrokerWithStoreFailure(broker, persistenceAdapterChoice);
broker = createRestartedBroker(); broker = createRestartedBroker();
broker.start(); broker.start();
connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() connection.close();
+ "?jms.transactedIndividualAck=true");
connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
connection = (ActiveMQConnection) connectionFactory.createConnection(); connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start(); connection.start();
@ -148,12 +230,17 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
connection.close(); connection.close();
} }
protected void stopBrokerWithStoreFailure() throws Exception { private void restartBroker() throws Exception {
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); broker.stop();
// have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
kahaDBPersistenceAdapter.getStore().getJournal().close();
broker.waitUntilStopped(); broker.waitUntilStopped();
broker = createRestartedBroker();
broker.start();
}
private BrokerService createRestartedBroker() throws Exception {
broker = new BrokerService();
configureBroker(broker);
return broker;
} }
private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException { private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException {
@ -166,12 +253,4 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
producer.close(); producer.close();
session.close(); session.close();
} }
public static Test suite() {
return suite(RedeliveryRestartTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
} }

View File

@ -16,12 +16,11 @@
*/ */
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import java.io.IOException;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.broker.region.policy.PolicyMap;
public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest { public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
@ -38,22 +37,14 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
@Override @Override
public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress); BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress);
configurePersistenceAdapter(brokerService); PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setPersistJMSRedelivered(true);
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
return brokerService; return brokerService;
} }
private void configurePersistenceAdapter(BrokerService brokerService) throws IOException {
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
kahaDBPersistenceAdapter.setRewriteOnRedelivery(true);
}
@Override
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
PersistenceAdapter persistenceAdapter = super.setDefaultPersistenceAdapter(broker);
configurePersistenceAdapter(broker);
return persistenceAdapter;
}
// no point rerunning these // no point rerunning these
@Override @Override
public void testFailoverProducerCloseBeforeTransaction() throws Exception { public void testFailoverProducerCloseBeforeTransaction() throws Exception {

View File

@ -50,7 +50,7 @@ public class MemoryLimitTest extends TestSupport {
@Parameterized.Parameter @Parameterized.Parameter
public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice; public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
@Parameterized.Parameters(name="{0}") @Parameterized.Parameters(name="store={0}")
public static Iterable<Object[]> getTestParameters() { public static Iterable<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}}); return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}});
} }