diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 64fe242e16..e113f8a815 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -16,20 +16,11 @@ */ package org.apache.activemq.store.kahadb; -import java.io.File; -import java.io.IOException; -import java.util.Set; import org.apache.activeio.journal.Journal; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.LocalTransactionId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.command.*; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -40,6 +31,10 @@ import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.usage.SystemUsage; +import java.io.File; +import java.io.IOException; +import java.util.Set; + /** * An implementation of {@link PersistenceAdapter} designed for use with a * {@link Journal} and then check pointing asynchronously on a timeout with some @@ -500,6 +495,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi letter.setForceRecoverIndex(forceRecoverIndex); } + public boolean isArchiveCorruptedIndex() { + return letter.isArchiveCorruptedIndex(); + } + + public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { + letter.setArchiveCorruptedIndex(archiveCorruptedIndex); + } + /** * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 365c509a22..0f8da7ca46 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -16,38 +16,6 @@ */ package org.apache.activemq.store.kahadb; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedSet; -import java.util.Stack; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -55,18 +23,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; -import org.apache.activemq.store.kahadb.data.KahaCommitCommand; -import org.apache.activemq.store.kahadb.data.KahaDestination; -import org.apache.activemq.store.kahadb.data.KahaEntryType; -import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; -import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; -import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; -import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; -import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; -import org.apache.activemq.store.kahadb.data.KahaTraceCommand; -import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; +import org.apache.activemq.store.kahadb.data.*; import org.apache.activemq.util.Callback; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ServiceStopper; @@ -80,20 +37,19 @@ import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; -import org.apache.kahadb.util.ByteSequence; -import org.apache.kahadb.util.DataByteArrayInputStream; -import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.LocationMarshaller; -import org.apache.kahadb.util.LockFile; -import org.apache.kahadb.util.LongMarshaller; -import org.apache.kahadb.util.Marshaller; -import org.apache.kahadb.util.Sequence; -import org.apache.kahadb.util.SequenceSet; -import org.apache.kahadb.util.StringMarshaller; -import org.apache.kahadb.util.VariableMarshaller; +import org.apache.kahadb.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.*; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { protected BrokerService brokerService; @@ -225,6 +181,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected boolean forceRecoverIndex = false; private final Object checkpointThreadLock = new Object(); private boolean rewriteOnRedelivery = false; + private boolean archiveCorruptedIndex = false; public MessageDatabase() { } @@ -333,7 +290,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void open() throws IOException { if( opened.compareAndSet(false, true) ) { getJournal().start(); - loadPageFile(); + try { + loadPageFile(); + } catch (IOException ioe) { + LOG.warn("Index corrupted, trying to recover ...", ioe); + // try to recover index + try { + pageFile.unload(); + } catch (Exception ignore) {} + if (archiveCorruptedIndex) { + pageFile.archive(); + } else { + pageFile.delete(); + } + loadPageFile(); + } startCheckpoint(); recover(); } @@ -2295,6 +2266,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.rewriteOnRedelivery = rewriteOnRedelivery; } + public boolean isArchiveCorruptedIndex() { + return archiveCorruptedIndex; + } + + public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { + this.archiveCorruptedIndex = archiveCorruptedIndex; + } + // ///////////////////////////////////////////////////////////////// // Internal conversion methods. // ///////////////////////////////////////////////////////////////// diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java index 0c0b167320..1f28aaad08 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java @@ -16,24 +16,15 @@ */ package org.apache.activemq.store.kahadb; -import java.io.File; -import java.net.URI; -import java.util.ArrayList; - import junit.framework.Test; - -import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.RecoveryBrokerTest; import org.apache.activemq.broker.StubConnection; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.*; + +import java.io.File; +import java.io.RandomAccessFile; +import java.util.ArrayList; /** @@ -53,8 +44,20 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { } protected BrokerService createRestartedBroker() throws Exception { + + // corrupting index + File index = new File("target/activemq-data/kahadb/db.data"); + index.delete(); + RandomAccessFile raf = new RandomAccessFile(index, "rw"); + raf.seek(index.length()); + raf.writeBytes("corrupt"); + raf.close(); + + // starting broker BrokerService broker = new BrokerService(); KahaDBStore kaha = new KahaDBStore(); + // uncomment if you want to test archiving + //kaha.setArchiveCorruptedIndex(true); kaha.setDirectory(new File("target/activemq-data/kahadb")); broker.setPersistenceAdapter(kaha); return broker; diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index 61498f10db..18f06c4f00 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -16,16 +16,13 @@ */ package org.apache.kahadb.page; +import org.apache.kahadb.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.RandomAccessFile; +import java.io.*; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; @@ -34,16 +31,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; import java.util.zip.Checksum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.IOExceptionSupport; -import org.apache.kahadb.util.IOHelper; -import org.apache.kahadb.util.IntrospectionSupport; -import org.apache.kahadb.util.LRUCache; -import org.apache.kahadb.util.Sequence; -import org.apache.kahadb.util.SequenceSet; - /** * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should * be externally synchronized. @@ -310,6 +297,16 @@ public class PageFile { delete(getFreeFile()); delete(getRecoveryFile()); } + + public void archive() throws IOException { + if( loaded.get() ) { + throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); + } + long timestamp = System.currentTimeMillis(); + archive(getMainPageFile(), String.valueOf(timestamp)); + archive(getFreeFile(), String.valueOf(timestamp)); + archive(getRecoveryFile(), String.valueOf(timestamp)); + } /** * @param file @@ -323,6 +320,15 @@ public class PageFile { } } + private void archive(File file, String suffix) throws IOException { + if( file.exists() ) { + File archive = new File(file.getPath() + "-" + suffix); + if( !file.renameTo(archive) ) { + throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath()); + } + } + } + /** * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the * first time the page file is loaded, then this creates the page file in the file system.