diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 3f925b4ffa..c3841c8527 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -106,6 +106,7 @@ public abstract class BaseDestination implements Destination { * percentage of in-flight messages above which optimize message store is disabled */ private int optimizeMessageStoreInFlightLimit = 10; + private boolean persistJMSRedelivered; /** * @param brokerService @@ -807,4 +808,11 @@ public abstract class BaseDestination implements Destination { } } + public void setPersistJMSRedelivered(boolean persistJMSRedelivered) { + this.persistJMSRedelivered = persistJMSRedelivered; + } + + public boolean isPersistJMSRedelivered() { + return persistJMSRedelivered; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java index 05aa6333d6..f300a1340f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java @@ -25,6 +25,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 4d547537d9..dd0e63b605 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -596,6 +596,17 @@ public class RegionBroker extends EmptyBroker { long totalTime = endTime - message.getBrokerInTime(); ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); } + if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) { + final int originalValue = message.getRedeliveryCounter(); + message.incrementRedeliveryCounter(); + try { + ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); + } catch (IOException error) { + LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error); + } finally { + message.setRedeliveryCounter(originalValue); + } + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 9e1b00602c..624d490235 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry { * percentage of in-flight messages above which optimize message store is disabled */ private int optimizeMessageStoreInFlightLimit = 10; + private boolean persistJMSRedelivered = false; public void configure(Broker broker,Queue queue) { @@ -196,6 +197,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); + destination.setPersistJMSRedelivered(isPersistJMSRedelivered()); } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -920,4 +922,11 @@ public class PolicyEntry extends DestinationMapEntry { this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; } + public void setPersistJMSRedelivered(boolean val) { + this.persistJMSRedelivered = val; + } + + public boolean isPersistJMSRedelivered() { + return persistJMSRedelivered; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index ba9efdeb7e..cd8d0f901a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -117,6 +117,10 @@ abstract public class AbstractMessageStore implements MessageStore { removeMessage(context, ack); } + public void updateMessage(Message message) throws IOException { + throw new IOException("update is not supported by: " + this); + } + static class CallableImplementation implements Callable { public Object call() throws Exception { return null; diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java index 6dac804ab9..d465bc5053 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java @@ -195,4 +195,5 @@ public interface MessageStore extends Service { */ public boolean isPrioritizedMessages(); + void updateMessage(Message message) throws IOException; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index ce1dd06150..e79229b43f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -155,4 +155,9 @@ public class ProxyMessageStore implements MessageStore { public boolean isPrioritizedMessages() { return delegate.isPrioritizedMessages(); } + + @Override + public void updateMessage(Message message) throws IOException { + delegate.updateMessage(message); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 8487650291..c0635faf74 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -204,4 +204,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public boolean isPrioritizedMessages() { return delegate.isPrioritizedMessages(); } + + public void updateMessage(Message message) throws IOException { + delegate.updateMessage(message); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 5ad6a32007..836b38883a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -152,5 +152,11 @@ public class MemoryMessageStore extends AbstractMessageStore { public void setBatch(MessageId messageId) { lastBatchId = messageId; } + + public void updateMessage(Message message) { + synchronized (messageTable) { + messageTable.put(message.getMessageId(), message); + } + } } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 19ba585962..912808d6b3 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.Set; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; @@ -107,4 +108,6 @@ public interface JDBCAdapter { void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException; void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException; + + void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException; } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 0ee982358d..3c441b0bcb 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -138,6 +138,19 @@ public class JDBCMessageStore extends AbstractMessageStore { } } + @Override + public void updateMessage(Message message) throws IOException { + TransactionContext c = persistenceAdapter.getTransactionContext(); + try { + adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message))); + } catch (SQLException e) { + JDBCPersistenceAdapter.log("JDBC Failure: ", e); + throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e); + } finally { + c.close(); + } + } + protected void onAdd(MessageId messageId, long sequenceId, byte priority) { if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) { recoveredAdditions.add(sequenceId); @@ -353,5 +366,5 @@ public class JDBCMessageStore extends AbstractMessageStore { public void setPrioritizedMessages(boolean prioritizedMessages) { super.setPrioritizedMessages(prioritizedMessages); - } + } } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 5df6f2ef1f..fc8046534f 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -169,7 +169,7 @@ public class Statements { public String getUpdateMessageStatement() { if (updateMessageStatement == null) { - updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE ID=?"; + updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?"; } return updateMessageStatement; } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 3243048a37..dc59621cf1 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -252,6 +252,24 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } + @Override + public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException { + PreparedStatement s = null; + cleanupExclusiveLock.readLock().lock(); + try { + s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement()); + setBinaryData(s, 1, data); + s.setString(2, id.getProducerId().toString()); + s.setLong(3, id.getProducerSequenceId()); + s.setString(4, destination.getQualifiedName()); + if (s.executeUpdate() != 1) { + throw new IOException("Could not update message: " + id + " in " + destination); + } + } finally { + cleanupExclusiveLock.readLock().unlock(); + close(s); + } + } public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 9bfbd832da..d8b986e49e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -555,18 +555,6 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements letter.setArchiveCorruptedIndex(archiveCorruptedIndex); } - /** - * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure - * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true - */ - public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) { - letter.setRewriteOnRedelivery(rewriteOnRedelivery); - } - - public boolean isRewriteOnRedelivery() { - return letter.isRewriteOnRedelivery(); - } - public float getIndexLFUEvictionFactor() { return letter.getIndexLFUEvictionFactor(); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 9255c0027d..1e84642ea7 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -60,10 +60,12 @@ import org.apache.activemq.store.*; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; +import org.apache.activemq.store.kahadb.data.KahaEntryType; import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; +import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.usage.MemoryUsage; @@ -276,46 +278,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { super.doStop(stopper); } - @Override - void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException { - Location location; - this.indexLock.writeLock().lock(); - try { - location = findMessageLocation(key, destination); - } finally { - this.indexLock.writeLock().unlock(); - } - - if (location != null) { - KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); - Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); - - message.incrementRedeliveryCounter(); - if (LOG.isTraceEnabled()) { - LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter()); - } - org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); - addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - - final Location rewriteLocation = journal.write(toByteSequence(addMessage), true); - - this.indexLock.writeLock().lock(); - try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(destination, tx); - Long sequence = sd.messageIdIndex.get(tx, key); - MessageKeys keys = sd.orderIndex.get(tx, sequence); - sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation)); - } - }); - } finally { - this.indexLock.writeLock().unlock(); - } - } - } - @Override void rollbackStatsOnDuplicate(KahaDestination commandDestination) { if (brokerService != null) { @@ -465,6 +427,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } + public void updateMessage(Message message) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); + } + KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); + KahaAddMessageCommand command = new KahaAddMessageCommand(); + command.setDestination(dest); + command.setMessageId(message.getMessageId().toProducerKey()); + command.setPriority(message.getPriority()); + command.setPrioritySupported(prioritizedMessages); + org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); + command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); + updateMessageCommand.setMessage(command); + store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); + } + @Override public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); @@ -1126,7 +1104,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { * @throws IOException */ Message loadMessage(Location location) throws IOException { - KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); + JournalCommand command = load(location); + KahaAddMessageCommand addMessage = null; + switch (command.type()) { + case KAHA_UPDATE_MESSAGE_COMMAND: + addMessage = ((KahaUpdateMessageCommand)command).getMessage(); + break; + default: + addMessage = (KahaAddMessageCommand) command; + } Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); return msg; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 78e26a956b..df970d452a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -69,6 +69,7 @@ import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; +import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; import org.apache.activemq.store.kahadb.disk.index.ListIndex; @@ -1113,6 +1114,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void visit(KahaTraceCommand command) { processLocation(location); } + + @Override + public void visit(KahaUpdateMessageCommand command) throws IOException { + process(command, location); + } }); } @@ -1127,7 +1133,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe pageFile.tx().execute(new Transaction.Closure() { @Override public void execute(Transaction tx) throws IOException { - upadateIndex(tx, command, location); + updateIndex(tx, command, location); } }); } finally { @@ -1136,6 +1142,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + @SuppressWarnings("rawtypes") + protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { + this.indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + updateIndex(tx, command, location); + } + }); + } finally { + this.indexLock.writeLock().unlock(); + } + } + @SuppressWarnings("rawtypes") protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { if (command.hasTransactionInfo()) { @@ -1253,27 +1274,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe updates = preparedTransactions.remove(key); } } - if (isRewriteOnRedelivery()) { - persistRedeliveryCount(updates); - } } - @SuppressWarnings("rawtypes") - private void persistRedeliveryCount(List updates) throws IOException { - if (updates != null) { - for (Operation operation : updates) { - operation.getCommand().visit(new Visitor() { - @Override - public void visit(KahaRemoveMessageCommand command) throws IOException { - incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination()); - } - }); - } - } - } - - abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException; - // ///////////////////////////////////////////////////////////////// // These methods do the actual index updates. // ///////////////////////////////////////////////////////////////// @@ -1281,7 +1283,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); private final HashSet journalFilesBeingReplicated = new HashSet(); - void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { + void updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); // Skip adding the message to the index if this is a topic and there are @@ -1320,6 +1322,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.lastUpdate = location; } + void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { + KahaAddMessageCommand command = updateMessageCommand.getMessage(); + StoredDestination sd = getStoredDestination(command.getDestination(), tx); + + Long id = sd.messageIdIndex.get(tx, command.getMessageId()); + if (id != null) { + sd.orderIndex.put( + tx, + command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, + id, + new MessageKeys(command.getMessageId(), location) + ); + sd.locationIndex.put(tx, location, id); + } else { + LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); + } + metadata.lastUpdate = location; + } + abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination); void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { @@ -2382,7 +2403,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @Override public void execute(Transaction tx) throws IOException { - upadateIndex(tx, command, location); + updateIndex(tx, command, location); } } @@ -2612,14 +2633,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.directoryArchive = directoryArchive; } - public boolean isRewriteOnRedelivery() { - return rewriteOnRedelivery; - } - - public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) { - this.rewriteOnRedelivery = rewriteOnRedelivery; - } - public boolean isArchiveCorruptedIndex() { return archiveCorruptedIndex; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java index 03072fe612..be4f2ff584 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java @@ -28,6 +28,7 @@ import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; +import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; public class Visitor { @@ -60,4 +61,7 @@ public class Visitor { public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException { } + + public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException { + } } diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto index 9d263087d9..8290c4c497 100644 --- a/activemq-kahadb-store/src/main/proto/journal-data.proto +++ b/activemq-kahadb-store/src/main/proto/journal-data.proto @@ -31,6 +31,7 @@ enum KahaEntryType { KAHA_SUBSCRIPTION_COMMAND = 7; KAHA_PRODUCER_AUDIT_COMMAND = 8; KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9; + KAHA_UPDATE_MESSAGE_COMMAND = 10; } message KahaTraceCommand { @@ -58,6 +59,13 @@ message KahaAddMessageCommand { optional bool prioritySupported = 6; } +message KahaUpdateMessageCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + required KahaAddMessageCommand message = 1; +} + message KahaRemoveMessageCommand { //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 7b90b0c77d..9256bb5bd9 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -394,7 +394,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P if( prepared ) { store.preparedAcks.remove(ack.getLastMessageId) } - uow.incrementRedelivery(store.key, ack.getLastMessageId) } } } @@ -701,6 +700,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P waitOn(asyncAddQueueMessage(context, message, delay)) } + override def updateMessage(message: Message): Unit = { + check_running + // the only current usage of update is to increment the redelivery counter + withUow {uow => uow.incrementRedelivery(key, message.getMessageId)} + } + def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture[AnyRef] = { uow.dequeue(key, id) } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java index 3fc7cf7e5a..46eecfbfc4 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java @@ -199,6 +199,22 @@ public abstract class TestSupport extends CombinationTestSupport { return adapter; } + public void stopBrokerWithStoreFailure(BrokerService broker, PersistenceAdapterChoice choice) throws Exception { + switch (choice) { + case KahaDB: + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + + // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover + kahaDBPersistenceAdapter.getStore().getJournal().close(); + break; + default: + // just stop normally by default + broker.stop(); + } + broker.waitUntilStopped(); + } + + /** * Test if base directory contains spaces */ diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java index a25b07174b..c4e3848803 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java @@ -59,6 +59,7 @@ public class BrokerRestartTestSupport extends BrokerTestSupport { */ protected void restartBroker() throws Exception { broker.stop(); + broker.waitUntilStopped(); broker = createRestartedBroker(); broker.start(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java deleted file mode 100644 index decf4d4579..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker; - -import junit.framework.Test; -import org.apache.activemq.leveldb.LevelDBStore; - -import java.io.IOException; - -/** - */ -public class LevelDBRedeliveryRestartTest extends RedeliveryRestartTest { - @Override - protected void configureBroker(BrokerService broker) throws Exception { - broker.setDestinationPolicy(policyMap); - LevelDBStore store = new LevelDBStore(); - broker.setPersistenceAdapter(store); - broker.addConnector("tcp://0.0.0.0:0"); - } - - @Override - protected void stopBrokerWithStoreFailure() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - public static Test suite() { - return suite(LevelDBRedeliveryRestartTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java index 922b67a958..8eba729b03 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker; +import java.util.Arrays; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; @@ -23,44 +24,123 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; - -import junit.framework.Test; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.transport.failover.FailoverTransport; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - -public class RedeliveryRestartTest extends BrokerRestartTestSupport { +@RunWith(value = Parameterized.class) +public class RedeliveryRestartTest extends TestSupport { private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class); + ActiveMQConnection connection; + BrokerService broker = null; + String queueName = "redeliveryRestartQ"; - @Override - protected void setUp() throws Exception { - setAutoFail(true); - setMaxTestTime(2 * 60 * 1000); - super.setUp(); + @Parameterized.Parameter + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = PersistenceAdapterChoice.KahaDB; + @Parameterized.Parameters(name="Store={0}") + public static Iterable data() { + return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB},{TestSupport.PersistenceAdapterChoice.JDBC},{TestSupport.PersistenceAdapterChoice.LevelDB}}); } @Override + @Before + public void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + configureBroker(broker); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + } + + @Override + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + broker.stop(); + super.tearDown(); + } + protected void configureBroker(BrokerService broker) throws Exception { - super.configureBroker(broker); - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setRewriteOnRedelivery(true); - kahaDBPersistenceAdapter.setCleanupInterval(500); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setPersistJMSRedelivered(true); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + setPersistenceAdapter(broker, persistenceAdapterChoice); broker.addConnector("tcp://0.0.0.0:0"); } + @org.junit.Test + public void testValidateRedeliveryFlagAfterRestartNoTx() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + + ")?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + populateDestination(10, queueName, connection); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = null; + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + consumer.close(); + + restartBroker(); + + // make failover aware of the restarted auto assigned port + connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0) + .getPublishableConnectString()); + + consumer = session.createConsumer(destination); + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + + // consume the rest that were not redeliveries + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + + @org.junit.Test public void testValidateRedeliveryFlagAfterRestart() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() - + ")?jms.transactedIndividualAck=true"); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + + ")?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); populateDestination(10, queueName, connection); @@ -109,10 +189,11 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport { connection.close(); } + @org.junit.Test public void testValidateRedeliveryFlagAfterRecovery() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() - + "?jms.transactedIndividualAck=true"); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); populateDestination(1, queueName, connection); @@ -121,19 +202,20 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport { Destination destination = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(destination); - TextMessage msg = (TextMessage) consumer.receive(20000); + TextMessage msg = (TextMessage) consumer.receive(5000); LOG.info("got: " + msg); assertNotNull("got the message", msg); assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); assertEquals("not a redelivery", false, msg.getJMSRedelivered()); - stopBrokerWithStoreFailure(); + stopBrokerWithStoreFailure(broker, persistenceAdapterChoice); broker = createRestartedBroker(); broker.start(); - connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() - + "?jms.transactedIndividualAck=true"); + connection.close(); + + connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); @@ -148,12 +230,17 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport { connection.close(); } - protected void stopBrokerWithStoreFailure() throws Exception { - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - - // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover - kahaDBPersistenceAdapter.getStore().getJournal().close(); + private void restartBroker() throws Exception { + broker.stop(); broker.waitUntilStopped(); + broker = createRestartedBroker(); + broker.start(); + } + + private BrokerService createRestartedBroker() throws Exception { + broker = new BrokerService(); + configureBroker(broker); + return broker; } private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException { @@ -166,12 +253,4 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport { producer.close(); session.close(); } - - public static Test suite() { - return suite(RedeliveryRestartTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java index 842563dc2c..c8d4c511a6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java @@ -16,12 +16,11 @@ */ package org.apache.activemq.transport.failover; -import java.io.IOException; import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest { @@ -38,22 +37,14 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest { @Override public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress); - configurePersistenceAdapter(brokerService); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setPersistJMSRedelivered(true); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); return brokerService; } - private void configurePersistenceAdapter(BrokerService brokerService) throws IOException { - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setRewriteOnRedelivery(true); - } - - @Override - public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { - PersistenceAdapter persistenceAdapter = super.setDefaultPersistenceAdapter(broker); - configurePersistenceAdapter(broker); - return persistenceAdapter; - } - // no point rerunning these @Override public void testFailoverProducerCloseBeforeTransaction() throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java index de9e11ce17..21c39ef6f4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -50,7 +50,7 @@ public class MemoryLimitTest extends TestSupport { @Parameterized.Parameter public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice; - @Parameterized.Parameters(name="{0}") + @Parameterized.Parameters(name="store={0}") public static Iterable getTestParameters() { return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}}); }