diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java index aee619a27a..e35327f848 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java @@ -178,6 +178,8 @@ public interface MessageStore extends Service { void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception; + void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception; + void dispose(ConnectionContext context); /** diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index cd319a65d3..a4fb4be5b8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -110,6 +110,11 @@ public class ProxyMessageStore implements MessageStore { delegate.recoverNextMessages(maxReturned, listener); } + @Override + public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception { + delegate.recoverNextMessages(offset, maxReturned, listener); + } + @Override public void resetBatching() { delegate.resetBatching(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 953c83e19a..f0857fb960 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -130,6 +130,32 @@ public class MemoryMessageStore extends AbstractMessageStore { } } + @Override + public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception { + synchronized (messageTable) { + boolean pastLackBatch = lastBatchId == null; + int position = 0; + for (Map.Entry entry : messageTable.entrySet()) { + if(offset > 0 && offset > position) { + position++; + continue; + } + if (pastLackBatch) { + Object msg = entry.getValue(); + lastBatchId = entry.getKey(); + if (msg.getClass() == MessageId.class) { + listener.recoverMessageReference((MessageId) msg); + } else { + listener.recoverMessage((Message) msg); + } + } else { + pastLackBatch = entry.getKey().equals(lastBatchId); + } + position++; + } + } + } + @Override public void resetBatching() { lastBatchId = null; diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 8adc2f78ee..70ddb7ab1e 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -398,6 +398,19 @@ public class JDBCMessageStore extends AbstractMessageStore { } + /** + * @param offset + * @param maxReturned + * @param listener + * @throws Exception + * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, + * org.apache.activemq.store.MessageRecoveryListener) + */ + @Override + public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener) throws Exception { + throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) is not supported."); + } + public void trackRollbackAck(Message message) { synchronized (rolledBackAcks) { rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 50224a5dfa..31a86bcae0 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -732,6 +732,47 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } } + @Override + public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception { + indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws Exception { + StoredDestination sd = getStoredDestination(dest, tx); + Entry entry = null; + int position = 0; + int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { + entry = iterator.next(); + + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; + } + + if(offset > 0 && offset > position) { + position++; + continue; + } + + Message msg = loadMessage(entry.getValue().location); + msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); + listener.recoverMessage(msg); + counter++; + position++; + if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { + break; + } + } + sd.orderIndex.stoppedIterating(); + } + }); + } finally { + indexLock.writeLock().unlock(); + } + } + protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { int counter = 0; String id; diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 7048b09a44..7835a1b7b6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -241,6 +241,37 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } + @Override + public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception { + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + @Override + public void execute(Transaction tx) throws Exception { + StoredDestination sd = getStoredDestination(dest, tx); + Entry entry=null; + int counter = 0; + int position = 0; + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + entry = iterator.next(); + if(offset > 0 && offset > position) { + position++; + continue; + } + listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); + counter++; + position++; + if( counter >= maxReturned ) { + break; + } + } + if( entry!=null ) { + cursorPos = entry.getKey()+1; + } + } + }); + } + } + @Override public void resetBatching() { cursorPos=0; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java index 5a1ab90d58..cbb1579b57 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -510,6 +510,11 @@ public class StoreQueueCursorOrderTest { } } + @Override + public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception { + + } + @Override public void setBatch(MessageId message) { batch.set((Long)message.getFutureOrSequenceLong()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java new file mode 100644 index 0000000000..9ea4b68a84 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import jakarta.jms.Connection; +import jakarta.jms.JMSException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import static org.junit.Assert.assertEquals; + +public class KahaDBOffsetRecoveryListenerTest { + + protected BrokerService brokerService = null; + protected KahaDBStore kaha = null; + + @Before + public void beforeEach() throws Exception { + + } + + @After + public void afterEach() { + brokerService = null; + kaha = null; + } + + protected BrokerService createBroker(KahaDBStore kaha) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(false); + broker.setPersistenceAdapter(kaha); + broker.start(); + broker.waitUntilStarted(10_000l); + return broker; + } + + private KahaDBStore createStore(boolean delete) throws IOException { + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb-recovery-tests")); + if( delete ) { + kaha.deleteAllMessages(); + } + return kaha; + } + + protected void runOffsetTest(int sendCount, int expectedMessageCount, int recoverOffset, int recoverCount, int expectedRecoverCount, int expectedRecoverIndex, String queueName) throws Exception { + kaha = createStore(true); + kaha.setJournalMaxFileLength(1024*100); + brokerService = createBroker(kaha); + sendMessages(sendCount, queueName); + brokerService.stop(); + brokerService.waitUntilStopped(); + + TestMessageRecoveryListener testMessageRecoveryListener = new TestMessageRecoveryListener(); + kaha = createStore(false); + kaha.start(); + MessageStore messageStore = kaha.createQueueMessageStore(new ActiveMQQueue(queueName)); + messageStore.start(); + assertEquals(Integer.valueOf(expectedMessageCount), Integer.valueOf(messageStore.getMessageCount())); + messageStore.recoverNextMessages(recoverOffset, recoverCount, testMessageRecoveryListener); + messageStore.stop(); + kaha.stop(); + + assertEquals(Integer.valueOf(expectedRecoverCount), Integer.valueOf(testMessageRecoveryListener.getRecoveredMessages().size())); + + if(expectedRecoverIndex >= 0) { + assertEquals(Integer.valueOf(expectedRecoverIndex), (Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index")); + } + + brokerService = createBroker(kaha); + assertEquals(sendCount, receiveMessages(queueName)); + } + + @Test + public void testOffsetZero() throws Exception { + runOffsetTest(1_000, 1_000, 0, 1, 1, 0, "TEST.OFFSET.ZERO"); + } + + @Test + public void testOffsetOne() throws Exception { + runOffsetTest(1_000, 1_000, 1, 1, 1, 1, "TEST.OFFSET.ONE"); + } + + @Test + public void testOffsetLastMinusOne() throws Exception { + runOffsetTest(1_000, 1_000, 999, 1, 1, 999, "TEST.OFFSET.LASTMINUSONE"); + } + + @Test + public void testOffsetLast() throws Exception { + runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, "TEST.OFFSET.LAST"); + } + + @Test + public void testOffsetBeyondQueueSizeNoError() throws Exception { + runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, "TEST.OFFSET.BEYOND"); + } + + @Test + public void testOffsetEmptyQueue() throws Exception { + runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY"); + } + + private void sendMessages(int count, String queueName) throws JMSException { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + cf.setUseAsyncSend(true); + cf.setProducerWindowSize(1024); + cf.setWatchTopicAdvisories(false); + + Connection connection = cf.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue(queueName)); + for (int i = 0; i < count; i++) { + TextMessage textMessage = session.createTextMessage(createContent(i)); + textMessage.setIntProperty("index", i); + producer.send(textMessage); + } + } finally { + connection.close(); + } + } + + private int receiveMessages(String queueName) throws JMSException { + int rc=0; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(queueName)); + while ( messageConsumer.receive(1000) !=null ) { + rc++; + } + return rc; + } finally { + connection.close(); + } + } + + private String createContent(int i) { + StringBuilder sb = new StringBuilder(i+":"); + while( sb.length() < 1024 ) { + sb.append("*"); + } + return sb.toString(); + } + + static class TestMessageRecoveryListener implements MessageRecoveryListener { + + List recoveredMessageIds = new LinkedList<>(); + List recoveredMessages = new LinkedList<>(); + + @Override + public boolean hasSpace() { + return true; + } + + @Override + public boolean isDuplicate(MessageId messageId) { + return recoveredMessageIds.contains(messageId); + } + + @Override + public boolean recoverMessage(Message message) throws Exception { + if(recoveredMessages.contains(message)) { + return false; + } + return recoveredMessages.add(message); + } + + @Override + public boolean recoverMessageReference(MessageId messageId) throws Exception { + if(recoveredMessageIds.contains(messageId)) { + return false; + } + return recoveredMessageIds.add(messageId); + } + + public List getRecoveredMessageIds() { + return recoveredMessageIds; + } + + public List getRecoveredMessages() { + return recoveredMessages; + } + } +}