resolve https://issues.apache.org/activemq/browse/AMQ-2832 - track ack file references to prevent cleanup of data files with acks for inuse message files, impl for kahaDB

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@964804 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-16 14:19:27 +00:00
parent a7f9164464
commit f2517c01b8
5 changed files with 255 additions and 13 deletions

View File

@ -415,7 +415,7 @@ public class AMQMessageStore extends AbstractMessageStore {
}
}
});
LOG.debug("Batch update done.");
LOG.debug("Batch update done. lastLocation:" + lastLocation);
lock.lock();
try {
cpAddedMessageIds = null;

View File

@ -500,6 +500,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
}
public boolean getForceRecoverIndex() {
return letter.getForceRecoverIndex();
}
public void setForceRecoverIndex(boolean forceRecoverIndex) {
letter.setForceRecoverIndex(forceRecoverIndex);
}
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";

View File

@ -290,6 +290,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return this.transactionStore;
}
public boolean getForceRecoverIndex() {
return this.forceRecoverIndex;
}
public void setForceRecoverIndex(boolean forceRecoverIndex) {
this.forceRecoverIndex = forceRecoverIndex;
}
public class KahaDBMessageStore extends AbstractMessageStore {
protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest;

View File

@ -197,7 +197,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false;
private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
protected boolean forceRecoverIndex = false;
public MessageDatabase() {
}
@ -428,7 +428,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (recoveryPosition != null) {
int redoCounter = 0;
LOG.info("Recoverying from the journal ...");
LOG.info("Recovering from the journal ...");
while (recoveryPosition != null) {
JournalCommand<?> message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
@ -653,18 +653,20 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
private Location getRecoveryPosition() throws IOException {
if (!this.forceRecoverIndex) {
// If we need to recover the transactions..
if (metadata.firstInProgressTransactionLocation != null) {
return metadata.firstInProgressTransactionLocation;
}
// If we need to recover the transactions..
if (metadata.firstInProgressTransactionLocation != null) {
return metadata.firstInProgressTransactionLocation;
// Perhaps there were no transactions...
if( metadata.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file.
return journal.getNextLocation(metadata.lastUpdate);
}
}
// Perhaps there were no transactions...
if( metadata.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file.
return journal.getNextLocation(metadata.lastUpdate);
}
// This loads the first position.
return journal.getNextLocation(null);
}
@ -1008,6 +1010,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
}
recordAckMessageReferenceLocation(ackLocation, keys.location);
}
} else {
// In the topic case we need remove the message once it's been acked
@ -1029,6 +1032,21 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
if (referenceFileIds == null) {
referenceFileIds = new HashSet<Integer>();
referenceFileIds.add(messageLocation.getDataFileId());
ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
} else {
Integer id = Integer.valueOf(messageLocation.getDataFileId());
if (!referenceFileIds.contains(id)) {
referenceFileIds.add(id);
}
}
}
void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.clear(tx);
@ -1170,6 +1188,28 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
});
}
// check we are not deleting file with ack for in-use journal files
Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) {
Integer candidate = candidates.next();
Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
if (referencedFileIds != null) {
for (Integer referencedFileId : referencedFileIds) {
if (journal.getFileMap().containsKey(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
// active file that is not targeted for deletion is referenced so don't delete
candidates.remove();
break;
}
}
if (gcCandidateSet.contains(candidate)) {
ackMessageFileMap.remove(candidate);
} else {
LOG.debug("not removing data file: " + candidate
+ " as contained ack(s) refer to referenced file: " + referencedFileIds);
}
}
}
if( !gcCandidateSet.isEmpty() ) {
LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
journal.removeDataFiles(gcCandidateSet);

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class AMQ2832Test {
private static final Log LOG = LogFactory.getLog(AMQ2832Test.class);
BrokerService broker = null;
private final Destination destination = new ActiveMQQueue("AMQ2832Test");
protected void startBroker(boolean delete) throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(delete);
broker.setPersistent(true);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:0");
configurePersistence(broker, delete);
broker.start();
LOG.info("Starting broker..");
}
protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
// ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20);
// speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(500);
adapter.setCleanupInterval(500);
if (!deleteAllOnStart) {
adapter.setForceRecoverIndex(true);
}
}
@Test
public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
startBroker(true);
StagedConsumer consumer = new StagedConsumer();
int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20);
// this will block the reclaiming of one data file
Message firstUnacked = consumer.receive(10);
LOG.info("first unacked: " + firstUnacked.getJMSMessageID());
Message secondUnacked = consumer.receive(1);
LOG.info("second unacked: " + secondUnacked.getJMSMessageID());
numMessagesAvailable -= 11;
numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
// ensure ack is another data file
LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID());
firstUnacked.acknowledge();
numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
consumer.receive(numMessagesAvailable).acknowledge();
// second unacked should keep first data file available but journal with the first ack
// may get whacked
consumer.close();
broker.stop();
broker.waitUntilStopped();
startBroker(false);
consumer = new StagedConsumer();
// need to force recovery?
Message msg = consumer.receive(1, 5);
assertNotNull("One messages left after recovery", msg);
msg.acknowledge();
// should be no more messages
msg = consumer.receive(1, 5);
assertEquals("Only one messages left after recovery: " + msg, null, msg);
consumer.close();
}
private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
int sent = 0;
Connection connection = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < numToSend; i++) {
producer.send(createMessage(session, i));
sent++;
}
} finally {
connection.close();
}
return sent;
}
final String payload = new String(new byte[1024]);
private Message createMessage(Session session, int i) throws Exception {
return session.createTextMessage(payload + "::" + i);
}
private class StagedConsumer {
Connection connection;
MessageConsumer consumer;
StagedConsumer() throws Exception {
connection = new ActiveMQConnectionFactory("failover://" +
broker.getTransportConnectors().get(0).getConnectUri().toString()).createConnection();
connection.start();
consumer = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE).createConsumer(destination);
}
public Message receive(int numToReceive) throws Exception {
return receive(numToReceive, 2);
}
public Message receive(int numToReceive, int timeoutInSeconds) throws Exception {
Message msg = null;
for (; numToReceive > 0; numToReceive--) {
do {
msg = consumer.receive(1*1000);
} while (msg == null && --timeoutInSeconds > 0);
if (numToReceive > 1) {
msg.acknowledge();
}
if (msg != null) {
LOG.debug("received: " + msg.getJMSMessageID());
}
}
// last message, unacked
return msg;
}
void close() throws JMSException {
consumer.close();
connection.close();
}
}
}