mirror of https://github.com/apache/activemq.git
[AMQ-9484] Support exporting kahadb messages from a queue with an offset
(cherry picked from commit 1a1b42f0c9
)
This commit is contained in:
parent
4a0bb48881
commit
dad947fe4a
|
@ -178,6 +178,8 @@ public interface MessageStore extends Service {
|
||||||
|
|
||||||
void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
|
void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
|
||||||
|
|
||||||
|
void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception;
|
||||||
|
|
||||||
void dispose(ConnectionContext context);
|
void dispose(ConnectionContext context);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -110,6 +110,11 @@ public class ProxyMessageStore implements MessageStore {
|
||||||
delegate.recoverNextMessages(maxReturned, listener);
|
delegate.recoverNextMessages(maxReturned, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
|
delegate.recoverNextMessages(offset, maxReturned, listener);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void resetBatching() {
|
public void resetBatching() {
|
||||||
delegate.resetBatching();
|
delegate.resetBatching();
|
||||||
|
|
|
@ -130,6 +130,32 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
|
synchronized (messageTable) {
|
||||||
|
boolean pastLackBatch = lastBatchId == null;
|
||||||
|
int position = 0;
|
||||||
|
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
|
||||||
|
if(offset > 0 && offset > position) {
|
||||||
|
position++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (pastLackBatch) {
|
||||||
|
Object msg = entry.getValue();
|
||||||
|
lastBatchId = entry.getKey();
|
||||||
|
if (msg.getClass() == MessageId.class) {
|
||||||
|
listener.recoverMessageReference((MessageId) msg);
|
||||||
|
} else {
|
||||||
|
listener.recoverMessage((Message) msg);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pastLackBatch = entry.getKey().equals(lastBatchId);
|
||||||
|
}
|
||||||
|
position++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void resetBatching() {
|
public void resetBatching() {
|
||||||
lastBatchId = null;
|
lastBatchId = null;
|
||||||
|
|
|
@ -398,6 +398,19 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param offset
|
||||||
|
* @param maxReturned
|
||||||
|
* @param listener
|
||||||
|
* @throws Exception
|
||||||
|
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
|
||||||
|
* org.apache.activemq.store.MessageRecoveryListener)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener) throws Exception {
|
||||||
|
throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) is not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
public void trackRollbackAck(Message message) {
|
public void trackRollbackAck(Message message) {
|
||||||
synchronized (rolledBackAcks) {
|
synchronized (rolledBackAcks) {
|
||||||
rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
|
rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
|
||||||
|
|
|
@ -732,6 +732,47 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
|
||||||
|
indexLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<Exception>() {
|
||||||
|
@Override
|
||||||
|
public void execute(Transaction tx) throws Exception {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
Entry<Long, MessageKeys> entry = null;
|
||||||
|
int position = 0;
|
||||||
|
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
|
||||||
|
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
|
||||||
|
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
|
||||||
|
entry = iterator.next();
|
||||||
|
|
||||||
|
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(offset > 0 && offset > position) {
|
||||||
|
position++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Message msg = loadMessage(entry.getValue().location);
|
||||||
|
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
|
||||||
|
listener.recoverMessage(msg);
|
||||||
|
counter++;
|
||||||
|
position++;
|
||||||
|
if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sd.orderIndex.stoppedIterating();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
indexLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
|
protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
String id;
|
String id;
|
||||||
|
|
|
@ -241,6 +241,37 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
pageFile.tx().execute(new Transaction.Closure<Exception>(){
|
||||||
|
@Override
|
||||||
|
public void execute(Transaction tx) throws Exception {
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
Entry<Long, MessageRecord> entry=null;
|
||||||
|
int counter = 0;
|
||||||
|
int position = 0;
|
||||||
|
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
|
||||||
|
entry = iterator.next();
|
||||||
|
if(offset > 0 && offset > position) {
|
||||||
|
position++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
|
||||||
|
counter++;
|
||||||
|
position++;
|
||||||
|
if( counter >= maxReturned ) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if( entry!=null ) {
|
||||||
|
cursorPos = entry.getKey()+1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void resetBatching() {
|
public void resetBatching() {
|
||||||
cursorPos=0;
|
cursorPos=0;
|
||||||
|
|
|
@ -510,6 +510,11 @@ public class StoreQueueCursorOrderTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBatch(MessageId message) {
|
public void setBatch(MessageId message) {
|
||||||
batch.set((Long)message.getFutureOrSequenceLong());
|
batch.set((Long)message.getFutureOrSequenceLong());
|
||||||
|
|
|
@ -0,0 +1,221 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import jakarta.jms.Connection;
|
||||||
|
import jakarta.jms.JMSException;
|
||||||
|
import jakarta.jms.MessageConsumer;
|
||||||
|
import jakarta.jms.MessageProducer;
|
||||||
|
import jakarta.jms.Session;
|
||||||
|
import jakarta.jms.TextMessage;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
public class KahaDBOffsetRecoveryListenerTest {
|
||||||
|
|
||||||
|
protected BrokerService brokerService = null;
|
||||||
|
protected KahaDBStore kaha = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void beforeEach() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void afterEach() {
|
||||||
|
brokerService = null;
|
||||||
|
kaha = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker(KahaDBStore kaha) throws Exception {
|
||||||
|
BrokerService broker = new BrokerService();
|
||||||
|
broker.setUseJmx(false);
|
||||||
|
broker.setPersistenceAdapter(kaha);
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted(10_000l);
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KahaDBStore createStore(boolean delete) throws IOException {
|
||||||
|
KahaDBStore kaha = new KahaDBStore();
|
||||||
|
kaha.setDirectory(new File("target/activemq-data/kahadb-recovery-tests"));
|
||||||
|
if( delete ) {
|
||||||
|
kaha.deleteAllMessages();
|
||||||
|
}
|
||||||
|
return kaha;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void runOffsetTest(int sendCount, int expectedMessageCount, int recoverOffset, int recoverCount, int expectedRecoverCount, int expectedRecoverIndex, String queueName) throws Exception {
|
||||||
|
kaha = createStore(true);
|
||||||
|
kaha.setJournalMaxFileLength(1024*100);
|
||||||
|
brokerService = createBroker(kaha);
|
||||||
|
sendMessages(sendCount, queueName);
|
||||||
|
brokerService.stop();
|
||||||
|
brokerService.waitUntilStopped();
|
||||||
|
|
||||||
|
TestMessageRecoveryListener testMessageRecoveryListener = new TestMessageRecoveryListener();
|
||||||
|
kaha = createStore(false);
|
||||||
|
kaha.start();
|
||||||
|
MessageStore messageStore = kaha.createQueueMessageStore(new ActiveMQQueue(queueName));
|
||||||
|
messageStore.start();
|
||||||
|
assertEquals(Integer.valueOf(expectedMessageCount), Integer.valueOf(messageStore.getMessageCount()));
|
||||||
|
messageStore.recoverNextMessages(recoverOffset, recoverCount, testMessageRecoveryListener);
|
||||||
|
messageStore.stop();
|
||||||
|
kaha.stop();
|
||||||
|
|
||||||
|
assertEquals(Integer.valueOf(expectedRecoverCount), Integer.valueOf(testMessageRecoveryListener.getRecoveredMessages().size()));
|
||||||
|
|
||||||
|
if(expectedRecoverIndex >= 0) {
|
||||||
|
assertEquals(Integer.valueOf(expectedRecoverIndex), (Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index"));
|
||||||
|
}
|
||||||
|
|
||||||
|
brokerService = createBroker(kaha);
|
||||||
|
assertEquals(sendCount, receiveMessages(queueName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOffsetZero() throws Exception {
|
||||||
|
runOffsetTest(1_000, 1_000, 0, 1, 1, 0, "TEST.OFFSET.ZERO");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOffsetOne() throws Exception {
|
||||||
|
runOffsetTest(1_000, 1_000, 1, 1, 1, 1, "TEST.OFFSET.ONE");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOffsetLastMinusOne() throws Exception {
|
||||||
|
runOffsetTest(1_000, 1_000, 999, 1, 1, 999, "TEST.OFFSET.LASTMINUSONE");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOffsetLast() throws Exception {
|
||||||
|
runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, "TEST.OFFSET.LAST");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOffsetBeyondQueueSizeNoError() throws Exception {
|
||||||
|
runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, "TEST.OFFSET.BEYOND");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOffsetEmptyQueue() throws Exception {
|
||||||
|
runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessages(int count, String queueName) throws JMSException {
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
|
||||||
|
cf.setUseAsyncSend(true);
|
||||||
|
cf.setProducerWindowSize(1024);
|
||||||
|
cf.setWatchTopicAdvisories(false);
|
||||||
|
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(new ActiveMQQueue(queueName));
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
TextMessage textMessage = session.createTextMessage(createContent(i));
|
||||||
|
textMessage.setIntProperty("index", i);
|
||||||
|
producer.send(textMessage);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int receiveMessages(String queueName) throws JMSException {
|
||||||
|
int rc=0;
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
try {
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(queueName));
|
||||||
|
while ( messageConsumer.receive(1000) !=null ) {
|
||||||
|
rc++;
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createContent(int i) {
|
||||||
|
StringBuilder sb = new StringBuilder(i+":");
|
||||||
|
while( sb.length() < 1024 ) {
|
||||||
|
sb.append("*");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
static class TestMessageRecoveryListener implements MessageRecoveryListener {
|
||||||
|
|
||||||
|
List<MessageId> recoveredMessageIds = new LinkedList<>();
|
||||||
|
List<Message> recoveredMessages = new LinkedList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasSpace() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDuplicate(MessageId messageId) {
|
||||||
|
return recoveredMessageIds.contains(messageId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean recoverMessage(Message message) throws Exception {
|
||||||
|
if(recoveredMessages.contains(message)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return recoveredMessages.add(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean recoverMessageReference(MessageId messageId) throws Exception {
|
||||||
|
if(recoveredMessageIds.contains(messageId)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return recoveredMessageIds.add(messageId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MessageId> getRecoveredMessageIds() {
|
||||||
|
return recoveredMessageIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Message> getRecoveredMessages() {
|
||||||
|
return recoveredMessages;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue