diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 2af83fbb23..d3548d55f5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -588,6 +588,10 @@ public class BrokerService implements Service { LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down"); removeShutdownHook(); + if (this.scheduler != null) { + this.scheduler.stop(); + this.scheduler = null; + } ServiceStopper stopper = new ServiceStopper(); if (services != null) { for (Service service : services) { @@ -645,10 +649,6 @@ public class BrokerService implements Service { this.taskRunnerFactory.shutdown(); this.taskRunnerFactory = null; } - if (this.scheduler != null) { - this.scheduler.stop(); - this.scheduler = null; - } if (this.executor != null) { this.executor.shutdownNow(); this.executor = null; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index a794febe7f..c491d83b45 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -300,8 +300,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe getJournal().start(); try { loadPageFile(); - } catch (IOException ioe) { - LOG.warn("Index corrupted, trying to recover ...", ioe); + } catch (Throwable t) { + LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); + if (LOG.isDebugEnabled()) { + LOG.debug("Index load failure", t); + } // try to recover index try { pageFile.unload(); @@ -311,6 +314,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } else { pageFile.delete(); } + metadata = new Metadata(); + pageFile = null; loadPageFile(); } startCheckpoint(); @@ -383,11 +388,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { this.indexLock.writeLock().lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, true); - } - }); + if (metadata.page != null) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, true); + } + }); + } pageFile.unload(); metadata = new Metadata(); } finally { @@ -413,11 +420,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.state = CLOSED_STATE; metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); - pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - tx.store(metadata.page, metadataMarshaller, true); - } - }); + if (metadata.page != null) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + tx.store(metadata.page, metadataMarshaller, true); + } + }); + } } } finally { this.indexLock.writeLock().unlock(); diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java index 1f28aaad08..1fa0e6faac 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java @@ -21,6 +21,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.RecoveryBrokerTest; import org.apache.activemq.broker.StubConnection; import org.apache.activemq.command.*; +import org.apache.kahadb.page.PageFile; import java.io.File; import java.io.RandomAccessFile; @@ -34,6 +35,9 @@ import java.util.ArrayList; */ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { + enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt }; + public CorruptionType failTest = CorruptionType.None; + protected BrokerService createBroker() throws Exception { BrokerService broker = new BrokerService(); KahaDBStore kaha = new KahaDBStore(); @@ -47,10 +51,27 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { // corrupting index File index = new File("target/activemq-data/kahadb/db.data"); - index.delete(); RandomAccessFile raf = new RandomAccessFile(index, "rw"); - raf.seek(index.length()); - raf.writeBytes("corrupt"); + switch (failTest) { + case FailToLoad: + index.delete(); + raf = new RandomAccessFile(index, "rw"); + raf.seek(index.length()); + raf.writeBytes("corrupt"); + break; + case LoadInvalid: + // page size 0 + raf.seek(0); + raf.writeBytes("corrupt and cannot load metadata"); + break; + case LoadCorrupt: + // loadable but invalid metadata + // location of order index low priority index for first destination... + raf.seek(8*1024 + 57); + raf.writeLong(Integer.MAX_VALUE-10); + break; + default: + } raf.close(); // starting broker @@ -71,7 +92,10 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { junit.textui.TestRunner.run(suite()); } - + public void initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() { + this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt} ); + } + public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception { ActiveMQDestination destination = new ActiveMQQueue("TEST");