git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1213743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-12-13 15:43:53 +00:00
parent cfcd4f7f64
commit 5f7fc14e2e
4 changed files with 89 additions and 98 deletions

View File

@ -16,20 +16,11 @@
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.command.*;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -40,6 +31,10 @@ import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage;
import java.io.File;
import java.io.IOException;
import java.util.Set;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
@ -500,6 +495,14 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
letter.setForceRecoverIndex(forceRecoverIndex);
}
public boolean isArchiveCorruptedIndex() {
return letter.isArchiveCorruptedIndex();
}
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
}
/**
* When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
* used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true

View File

@ -16,38 +16,6 @@
*/
package org.apache.activemq.store.kahadb;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.Stack;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@ -55,18 +23,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.*;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
@ -80,20 +37,19 @@ import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LocationMarshaller;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
import org.apache.kahadb.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.*;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
protected BrokerService brokerService;
@ -225,6 +181,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
private boolean rewriteOnRedelivery = false;
private boolean archiveCorruptedIndex = false;
public MessageDatabase() {
}
@ -333,7 +290,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void open() throws IOException {
if( opened.compareAndSet(false, true) ) {
getJournal().start();
loadPageFile();
try {
loadPageFile();
} catch (IOException ioe) {
LOG.warn("Index corrupted, trying to recover ...", ioe);
// try to recover index
try {
pageFile.unload();
} catch (Exception ignore) {}
if (archiveCorruptedIndex) {
pageFile.archive();
} else {
pageFile.delete();
}
loadPageFile();
}
startCheckpoint();
recover();
}
@ -2295,6 +2266,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.rewriteOnRedelivery = rewriteOnRedelivery;
}
public boolean isArchiveCorruptedIndex() {
return archiveCorruptedIndex;
}
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
this.archiveCorruptedIndex = archiveCorruptedIndex;
}
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////

View File

@ -16,24 +16,15 @@
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.*;
import java.io.File;
import java.io.RandomAccessFile;
import java.util.ArrayList;
/**
@ -53,8 +44,20 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
}
protected BrokerService createRestartedBroker() throws Exception {
// corrupting index
File index = new File("target/activemq-data/kahadb/db.data");
index.delete();
RandomAccessFile raf = new RandomAccessFile(index, "rw");
raf.seek(index.length());
raf.writeBytes("corrupt");
raf.close();
// starting broker
BrokerService broker = new BrokerService();
KahaDBStore kaha = new KahaDBStore();
// uncomment if you want to test archiving
//kaha.setArchiveCorruptedIndex(true);
kaha.setDirectory(new File("target/activemq-data/kahadb"));
broker.setPersistenceAdapter(kaha);
return broker;

View File

@ -16,16 +16,13 @@
*/
package org.apache.kahadb.page;
import org.apache.kahadb.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.io.*;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
@ -34,16 +31,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.IOExceptionSupport;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache;
import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;
/**
* A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
* be externally synchronized.
@ -311,6 +298,16 @@ public class PageFile {
delete(getRecoveryFile());
}
public void archive() throws IOException {
if( loaded.get() ) {
throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
}
long timestamp = System.currentTimeMillis();
archive(getMainPageFile(), String.valueOf(timestamp));
archive(getFreeFile(), String.valueOf(timestamp));
archive(getRecoveryFile(), String.valueOf(timestamp));
}
/**
* @param file
* @throws IOException
@ -323,6 +320,15 @@ public class PageFile {
}
}
private void archive(File file, String suffix) throws IOException {
if( file.exists() ) {
File archive = new File(file.getPath() + "-" + suffix);
if( !file.renameTo(archive) ) {
throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
}
}
}
/**
* Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the
* first time the page file is loaded, then this creates the page file in the file system.