https://issues.apache.org/jira/browse/AMQ-3634 - ensure full recovery of the index, irrespective of the load failure reason, with additional tests. Shutdown the schedualler early to ensure no ugly errors from timer tasks during shutdown

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1236661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-01-27 12:47:39 +00:00
parent c4c9cf2e94
commit dcf1f5e0cc
3 changed files with 53 additions and 20 deletions

View File

@ -588,6 +588,10 @@ public class BrokerService implements Service {
LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down"); LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
removeShutdownHook(); removeShutdownHook();
if (this.scheduler != null) {
this.scheduler.stop();
this.scheduler = null;
}
ServiceStopper stopper = new ServiceStopper(); ServiceStopper stopper = new ServiceStopper();
if (services != null) { if (services != null) {
for (Service service : services) { for (Service service : services) {
@ -645,10 +649,6 @@ public class BrokerService implements Service {
this.taskRunnerFactory.shutdown(); this.taskRunnerFactory.shutdown();
this.taskRunnerFactory = null; this.taskRunnerFactory = null;
} }
if (this.scheduler != null) {
this.scheduler.stop();
this.scheduler = null;
}
if (this.executor != null) { if (this.executor != null) {
this.executor.shutdownNow(); this.executor.shutdownNow();
this.executor = null; this.executor = null;

View File

@ -300,8 +300,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
getJournal().start(); getJournal().start();
try { try {
loadPageFile(); loadPageFile();
} catch (IOException ioe) { } catch (Throwable t) {
LOG.warn("Index corrupted, trying to recover ...", ioe); LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
if (LOG.isDebugEnabled()) {
LOG.debug("Index load failure", t);
}
// try to recover index // try to recover index
try { try {
pageFile.unload(); pageFile.unload();
@ -311,6 +314,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} else { } else {
pageFile.delete(); pageFile.delete();
} }
metadata = new Metadata();
pageFile = null;
loadPageFile(); loadPageFile();
} }
startCheckpoint(); startCheckpoint();
@ -383,11 +388,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try { try {
this.indexLock.writeLock().lock(); this.indexLock.writeLock().lock();
try { try {
pageFile.tx().execute(new Transaction.Closure<IOException>() { if (metadata.page != null) {
public void execute(Transaction tx) throws IOException { pageFile.tx().execute(new Transaction.Closure<IOException>() {
checkpointUpdate(tx, true); public void execute(Transaction tx) throws IOException {
} checkpointUpdate(tx, true);
}); }
});
}
pageFile.unload(); pageFile.unload();
metadata = new Metadata(); metadata = new Metadata();
} finally { } finally {
@ -413,11 +420,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.state = CLOSED_STATE; metadata.state = CLOSED_STATE;
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
pageFile.tx().execute(new Transaction.Closure<IOException>() { if (metadata.page != null) {
public void execute(Transaction tx) throws IOException { pageFile.tx().execute(new Transaction.Closure<IOException>() {
tx.store(metadata.page, metadataMarshaller, true); public void execute(Transaction tx) throws IOException {
} tx.store(metadata.page, metadataMarshaller, true);
}); }
});
}
} }
} finally { } finally {
this.indexLock.writeLock().unlock(); this.indexLock.writeLock().unlock();

View File

@ -21,6 +21,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest; import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.broker.StubConnection; import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.*; import org.apache.activemq.command.*;
import org.apache.kahadb.page.PageFile;
import java.io.File; import java.io.File;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
@ -34,6 +35,9 @@ import java.util.ArrayList;
*/ */
public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt };
public CorruptionType failTest = CorruptionType.None;
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
KahaDBStore kaha = new KahaDBStore(); KahaDBStore kaha = new KahaDBStore();
@ -47,10 +51,27 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
// corrupting index // corrupting index
File index = new File("target/activemq-data/kahadb/db.data"); File index = new File("target/activemq-data/kahadb/db.data");
index.delete();
RandomAccessFile raf = new RandomAccessFile(index, "rw"); RandomAccessFile raf = new RandomAccessFile(index, "rw");
raf.seek(index.length()); switch (failTest) {
raf.writeBytes("corrupt"); case FailToLoad:
index.delete();
raf = new RandomAccessFile(index, "rw");
raf.seek(index.length());
raf.writeBytes("corrupt");
break;
case LoadInvalid:
// page size 0
raf.seek(0);
raf.writeBytes("corrupt and cannot load metadata");
break;
case LoadCorrupt:
// loadable but invalid metadata
// location of order index low priority index for first destination...
raf.seek(8*1024 + 57);
raf.writeLong(Integer.MAX_VALUE-10);
break;
default:
}
raf.close(); raf.close();
// starting broker // starting broker
@ -71,7 +92,10 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
junit.textui.TestRunner.run(suite()); junit.textui.TestRunner.run(suite());
} }
public void initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() {
this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt} );
}
public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception { public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST"); ActiveMQDestination destination = new ActiveMQQueue("TEST");