https://issues.apache.org/jira/browse/AMQ-2736 - KahaDB doesn't clean up old files (abortive shutdown)

After kill -9 with outstanding local transaction the transaction is recovered in error. fix and test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1071732 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-02-17 18:04:54 +00:00
parent 14722f8627
commit a6c51a4532
4 changed files with 151 additions and 28 deletions

View File

@ -493,6 +493,11 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
letter.setForceRecoverIndex(forceRecoverIndex);
}
// for testing
public KahaDBStore getStore() {
return letter;
}
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";

View File

@ -960,29 +960,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////
KahaTransactionInfo createTransactionInfo(TransactionId txid) {
if (txid == null) {
return null;
}
KahaTransactionInfo rc = new KahaTransactionInfo();
if (txid.isLocalTransaction()) {
LocalTransactionId t = (LocalTransactionId) txid;
KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
kahaTxId.setConnectionId(t.getConnectionId().getValue());
kahaTxId.setTransacitonId(t.getValue());
rc.setLocalTransacitonId(kahaTxId);
} else {
XATransactionId t = (XATransactionId) txid;
KahaXATransactionId kahaTxId = new KahaXATransactionId();
kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
kahaTxId.setFormatId(t.getFormatId());
rc.setXaTransacitonId(kahaTxId);
}
return rc;
}
KahaLocation convert(Location location) {
KahaLocation rc = new KahaLocation();
rc.setLogId(location.getDataFileId());

View File

@ -351,6 +351,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
// for testing
public LockFile getLockFile() {
return lockFile;
}
public void load() throws IOException {
this.indexLock.writeLock().lock();
@ -417,10 +422,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
close();
}
/**
* @return
*/
private Location getFirstInProgressTxLocation() {
// public for testing
public Location getFirstInProgressTxLocation() {
Location l = null;
synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) {
@ -474,6 +477,21 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
recoverIndex(tx);
}
});
// rollback any recovered inflight local transactions
Set<TransactionId> toRollback = new HashSet<TransactionId>();
synchronized (inflightTransactions) {
for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
TransactionId id = it.next();
if (id.isLocalTransaction()) {
toRollback.add(id);
}
}
for (TransactionId tx: toRollback) {
LOG.debug("rolling back recovered indoubt local transaction " + tx);
store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null);
}
}
}finally {
this.indexLock.writeLock().unlock();
}
@ -1987,6 +2005,32 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
this.databaseLockedWaitDelay = databaseLockedWaitDelay;
}
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////
KahaTransactionInfo createTransactionInfo(TransactionId txid) {
if (txid == null) {
return null;
}
KahaTransactionInfo rc = new KahaTransactionInfo();
if (txid.isLocalTransaction()) {
LocalTransactionId t = (LocalTransactionId) txid;
KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
kahaTxId.setConnectionId(t.getConnectionId().getValue());
kahaTxId.setTransacitonId(t.getValue());
rc.setLocalTransacitonId(kahaTxId);
} else {
XATransactionId t = (XATransactionId) txid;
KahaXATransactionId kahaTxId = new KahaXATransactionId();
kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
kahaTxId.setFormatId(t.getFormatId());
rc.setXaTransacitonId(kahaTxId);
}
return rc;
}
class MessageOrderCursor{
long defaultCursorPosition;

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class AMQ2736Test {
BrokerService broker;
@Test
public void testRollbackOnRecover() throws Exception {
broker = createAndStartBroker(true);
DefaultIOExceptionHandler ignoreAllExceptionsIOExHandler = new DefaultIOExceptionHandler();
ignoreAllExceptionsIOExHandler.setIgnoreAllErrors(true);
broker.setIoExceptionHandler(ignoreAllExceptionsIOExHandler);
ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost?async=false");
f.setAlwaysSyncSend(true);
Connection c = f.createConnection();
c.start();
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer p = s.createProducer(new ActiveMQQueue("Tx"));
p.send(s.createTextMessage("aa"));
// kill journal without commit
KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
KahaDBStore store = pa.getStore();
assertNotNull("last tx location is present " + store.getFirstInProgressTxLocation());
// test hack, close the journal to ensure no further journal updates when broker stops
// mimic kill -9 in terms of no normal shutdown sequence
store.getJournal().close();
try {
store.close();
} catch (Exception expectedLotsAsJournalBorked) {
}
store.getLockFile().unlock();
broker.stop();
broker.waitUntilStopped();
// restart with recovery
broker = createAndStartBroker(false);
pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
store = pa.getStore();
// inflight non xa tx should be rolledback on recovery
assertNull("in progress tx location is present ", store.getFirstInProgressTxLocation());
}
@After
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
private BrokerService createAndStartBroker(boolean deleteAll) throws Exception {
BrokerService broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(deleteAll);
broker.setUseJmx(false);
broker.getManagementContext().setCreateConnector(false);
broker.start();
return broker;
}
}