https://issues.apache.org/jira/browse/AMQ-4172 - resolve with test. inflight transactions need to just protect a data file range rather than all subsequent data files. so gc can reclaim what is valid

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1436291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-01-21 11:37:23 +00:00
parent f5360a727d
commit 0dcdab784b
3 changed files with 219 additions and 33 deletions

View File

@ -54,7 +54,7 @@ public class AMQ2736Test {
KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
KahaDBStore store = pa.getStore(); KahaDBStore store = pa.getStore();
assertNotNull("last tx location is present " + store.getFirstInProgressTxLocation()); assertNotNull("last tx location is present " + store.getInProgressTxLocationRange()[1]);
// test hack, close the journal to ensure no further journal updates when broker stops // test hack, close the journal to ensure no further journal updates when broker stops
// mimic kill -9 in terms of no normal shutdown sequence // mimic kill -9 in terms of no normal shutdown sequence
@ -74,7 +74,7 @@ public class AMQ2736Test {
store = pa.getStore(); store = pa.getStore();
// inflight non xa tx should be rolledback on recovery // inflight non xa tx should be rolledback on recovery
assertNull("in progress tx location is present ", store.getFirstInProgressTxLocation()); assertNull("in progress tx location is present ", store.getInProgressTxLocationRange()[0]);
} }

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
public class TransactedStoreUsageSuspendResumeTest {
private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class);
private static final int MAX_MESSAGES = 10000;
private static final String QUEUE_NAME = "test.queue";
private BrokerService broker;
private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES);
private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES);
private final CountDownLatch consumerStartLatch = new CountDownLatch(1);
private class ConsumerThread extends Thread {
@Override
public void run() {
try {
consumerStartLatch.await(30, TimeUnit.SECONDS);
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// wait for producer to stop
long currentSendCount;
do {
currentSendCount = messagesSentCountDown.getCount();
TimeUnit.SECONDS.sleep(5);
} while (currentSendCount != messagesSentCountDown.getCount());
LOG.info("Starting consumer at: " + currentSendCount);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
do {
Message message = consumer.receive(1000);
if (message != null) {
session.commit();
messagesReceivedCountDown.countDown();
}
if (messagesReceivedCountDown.getCount() % 500 == 0) {
LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount());
}
} while (messagesReceivedCountDown.getCount() != 0);
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}
@Before
public void setup() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
kahaDB.setJournalMaxFileLength(500 * 1024);
kahaDB.setCleanupInterval(10*1000);
broker.setPersistenceAdapter(kahaDB);
broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024);
broker.start();
broker.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
broker.stop();
}
@Test
public void testTransactedStoreUsageSuspendResume() throws Exception {
ConsumerThread thread = new ConsumerThread();
thread.start();
ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
sendExecutor.execute(new Runnable() {
@Override
public void run() {
try {
sendMessages();
} catch (Exception ignored) {
}
}
});
sendExecutor.shutdown();
sendExecutor.awaitTermination(5, TimeUnit.MINUTES);
boolean allMessagesReceived = messagesReceivedCountDown.await(120, TimeUnit.SECONDS);
assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived);
}
private void sendMessages() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setAlwaysSyncSend(true);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination queue = session.createQueue(QUEUE_NAME);
Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain");
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[10]);
for (int i=0; i<4240; i++) {
// mostly fill the store with retained messages
// so consumer only has a small bit of store usage to work with
producer.send(retainQueue, message);
session.commit();
}
consumerStartLatch.countDown();
for (int i = 0; i < MAX_MESSAGES; i++) {
producer.send(queue, message);
if (i>0 && i%20 == 0) {
session.commit();
}
messagesSentCountDown.countDown();
if (i>0 && i%500 == 0) {
LOG.info("Sent : " + i);
}
}
session.commit();
producer.close();
session.close();
connection.close();
}
}

View File

@ -29,6 +29,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -421,7 +422,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try { try {
if( pageFile != null && pageFile.isLoaded() ) { if( pageFile != null && pageFile.isLoaded() ) {
metadata.state = CLOSED_STATE; metadata.state = CLOSED_STATE;
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
if (metadata.page != null) { if (metadata.page != null) {
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
@ -440,31 +441,37 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// public for testing // public for testing
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public Location getFirstInProgressTxLocation() { public Location[] getInProgressTxLocationRange() {
Location l = null; Location[] range = new Location[]{null, null};
synchronized (inflightTransactions) { synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) { if (!inflightTransactions.isEmpty()) {
for (List<Operation> ops : inflightTransactions.values()) { for (List<Operation> ops : inflightTransactions.values()) {
if (!ops.isEmpty()) { if (!ops.isEmpty()) {
l = ops.get(0).getLocation(); trackMaxAndMin(range, ops);
break;
} }
} }
} }
if (!preparedTransactions.isEmpty()) { if (!preparedTransactions.isEmpty()) {
for (List<Operation> ops : preparedTransactions.values()) { for (List<Operation> ops : preparedTransactions.values()) {
if (!ops.isEmpty()) { if (!ops.isEmpty()) {
trackMaxAndMin(range, ops);
}
}
}
}
return range;
}
private void trackMaxAndMin(Location[] range, List<Operation> ops) {
Location t = ops.get(0).getLocation(); Location t = ops.get(0).getLocation();
if (l==null || t.compareTo(l) <= 0) { if (range[0]==null || t.compareTo(range[0]) <= 0) {
l = t; range[0] = t;
} }
break; t = ops.get(ops.size() -1).getLocation();
if (range[1]==null || t.compareTo(range[1]) >= 0) {
range[1] = t;
} }
} }
}
}
return l;
}
class TranInfo { class TranInfo {
TransactionId id; TransactionId id;
@ -1385,11 +1392,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.debug("Checkpoint started."); LOG.debug("Checkpoint started.");
// reflect last update exclusive of current checkpoint // reflect last update exclusive of current checkpoint
Location firstTxLocation = metadata.lastUpdate; Location lastUpdate = metadata.lastUpdate;
metadata.state = OPEN_STATE; metadata.state = OPEN_STATE;
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); Location[] inProgressTxRange = getInProgressTxLocationRange();
metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
tx.store(metadata.page, metadataMarshaller, true); tx.store(metadata.page, metadataMarshaller, true);
pageFile.flush(); pageFile.flush();
@ -1399,7 +1407,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet); LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
}
if (lastUpdate != null) {
gcCandidateSet.remove(lastUpdate.getDataFileId());
} }
// Don't GC files under replication // Don't GC files under replication
@ -1411,25 +1423,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId()); gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
} }
// Don't GC files after the first in progress tx // Don't GC files referenced by in-progress tx
if( metadata.firstInProgressTransactionLocation!=null ) { if (inProgressTxRange[0] != null) {
if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) { for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
firstTxLocation = metadata.firstInProgressTransactionLocation; gcCandidateSet.remove(pendingTx);
}
}
if( firstTxLocation!=null ) {
while( !gcCandidateSet.isEmpty() ) {
Integer last = gcCandidateSet.last();
if( last >= firstTxLocation.getDataFileId() ) {
gcCandidateSet.remove(last);
} else {
break;
} }
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet); LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
}
} }
// Go through all the destinations to see if any of them can remove GC candidates. // Go through all the destinations to see if any of them can remove GC candidates.