HBASE-13476 Procedure v2 - Add Replay Order logic for child procedures

This commit is contained in:
Matteo Bertozzi 2015-05-28 19:33:22 +01:00
parent f0a1ca4a6f
commit 4aa7209826
14 changed files with 1164 additions and 210 deletions

View File

@ -519,6 +519,20 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
/**
* Get an hashcode for the specified Procedure ID
* @return the hashcode for the specified procId
*/
public static long getProcIdHashCode(final long procId) {
long h = procId;
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
/*
* Helper to lookup the root Procedure ID given a specified procedure.
*/

View File

@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
@ -264,45 +264,70 @@ public class ProcedureExecutor<TEnvironment> {
this.conf = conf;
}
private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
private void load(final boolean abortOnCorruption) throws IOException {
Preconditions.checkArgument(completed.isEmpty());
Preconditions.checkArgument(rollbackStack.isEmpty());
Preconditions.checkArgument(procedures.isEmpty());
Preconditions.checkArgument(waitingTimeout.isEmpty());
Preconditions.checkArgument(runnables.size() == 0);
// 1. Load the procedures
Iterator<Procedure> loader = store.load();
if (loader == null) {
lastProcId.set(0);
return null;
}
long logMaxProcId = 0;
int runnablesCount = 0;
while (loader.hasNext()) {
Procedure proc = loader.next();
proc.beforeReplay(getEnvironment());
procedures.put(proc.getProcId(), proc);
logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
if (LOG.isDebugEnabled()) {
LOG.debug("Loading procedure state=" + proc.getState() +
" isFailed=" + proc.hasException() + ": " + proc);
store.load(new ProcedureStore.ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
LOG.debug("load procedures maxProcId=" + maxProcId);
lastProcId.set(maxProcId);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
loadProcedures(procIter, abortOnCorruption);
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
int corruptedCount = 0;
while (procIter.hasNext()) {
Procedure proc = procIter.next();
LOG.error("corrupted procedure: " + proc);
corruptedCount++;
}
if (abortOnCorruption && corruptedCount > 0) {
throw new IOException("found " + corruptedCount + " procedures on replay");
}
}
});
}
private void loadProcedures(final ProcedureIterator procIter,
final boolean abortOnCorruption) throws IOException {
// 1. Build the rollback stack
int runnablesCount = 0;
while (procIter.hasNext()) {
Procedure proc = procIter.next();
if (!proc.hasParent() && !proc.isFinished()) {
rollbackStack.put(proc.getProcId(), new RootProcedureState());
}
// add the procedure to the map
proc.beforeReplay(getEnvironment());
procedures.put(proc.getProcId(), proc);
if (proc.getState() == ProcedureState.RUNNABLE) {
runnablesCount++;
}
}
assert lastProcId.get() < 0;
lastProcId.set(logMaxProcId);
// 2. Initialize the stacks
TreeSet<Procedure> runnableSet = null;
ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
HashSet<Procedure> waitingSet = null;
for (final Procedure proc: procedures.values()) {
procIter.reset();
while (procIter.hasNext()) {
Procedure proc = procIter.next();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
proc.getState(), proc.hasException(), proc));
}
Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback?
@ -312,10 +337,11 @@ public class ProcedureExecutor<TEnvironment> {
if (!proc.hasParent() && proc.isFinished()) {
if (LOG.isDebugEnabled()) {
LOG.debug("The procedure is completed state=" + proc.getState() +
" isFailed=" + proc.hasException() + ": " + proc);
LOG.debug(String.format("The procedure is completed state=%s isFailed=%s",
proc.getState(), proc.hasException()));
}
assert !rollbackStack.containsKey(proc.getProcId());
procedures.remove(proc.getProcId());
completed.put(proc.getProcId(), newResultFromProcedure(proc));
continue;
}
@ -333,10 +359,7 @@ public class ProcedureExecutor<TEnvironment> {
switch (proc.getState()) {
case RUNNABLE:
if (runnableSet == null) {
runnableSet = new TreeSet<Procedure>();
}
runnableSet.add(proc);
runnableList.add(proc);
break;
case WAITING_TIMEOUT:
if (waitingSet == null) {
@ -361,7 +384,7 @@ public class ProcedureExecutor<TEnvironment> {
}
// 3. Validate the stacks
List<Map.Entry<Long, RootProcedureState>> corrupted = null;
int corruptedCount = 0;
Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
while (itStack.hasNext()) {
Map.Entry<Long, RootProcedureState> entry = itStack.next();
@ -369,32 +392,49 @@ public class ProcedureExecutor<TEnvironment> {
if (procStack.isValid()) continue;
for (Procedure proc: procStack.getSubprocedures()) {
LOG.error("corrupted procedure: " + proc);
procedures.remove(proc.getProcId());
if (runnableSet != null) runnableSet.remove(proc);
runnableList.remove(proc);
if (waitingSet != null) waitingSet.remove(proc);
corruptedCount++;
}
itStack.remove();
if (corrupted == null) {
corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
}
corrupted.add(entry);
}
if (abortOnCorruption && corruptedCount > 0) {
throw new IOException("found " + corruptedCount + " procedures on replay");
}
// 4. Push the runnables
if (runnableSet != null) {
// TODO: See ProcedureWALFormatReader.readInitEntry() some procedure
// may be started way before this stuff.
for (Procedure proc: runnableSet) {
if (!runnableList.isEmpty()) {
// TODO: See ProcedureWALFormatReader#hasFastStartSupport
// some procedure may be started way before this stuff.
for (int i = runnableList.size() - 1; i >= 0; --i) {
Procedure proc = runnableList.get(i);
if (!proc.hasParent()) {
sendProcedureLoadedNotification(proc.getProcId());
}
runnables.addBack(proc);
if (proc.wasExecuted()) {
runnables.addFront(proc);
} else {
// if it was not in execution, it can wait.
runnables.addBack(proc);
}
}
}
return corrupted;
}
public void start(int numThreads) throws IOException {
/**
* Start the procedure executor.
* It calls ProcedureStore.recoverLease() and ProcedureStore.load() to
* recover the lease, and ensure a single executor, and start the procedure
* replay to resume and recover the previous pending and in-progress perocedures.
*
* @param numThreads number of threads available for procedure execution.
* @param abortOnCorruption true if you want to abort your service in case
* a corrupted procedure is found on replay. otherwise false.
*/
public void start(int numThreads, boolean abortOnCorruption) throws IOException {
if (running.getAndSet(true)) {
LOG.warn("Already running");
return;
@ -427,11 +467,11 @@ public class ProcedureExecutor<TEnvironment> {
store.recoverLease();
// TODO: Split in two steps.
// TODO: Handle corrupted procedure returned (probably just a WARN)
// TODO: Handle corrupted procedures (currently just a warn)
// The first one will make sure that we have the latest id,
// so we can start the threads and accept new procedures.
// The second step will do the actual load of old procedures.
load();
load(abortOnCorruption);
// Start the executors. Here we must have the lastProcId set.
for (int i = 0; i < threads.length; ++i) {

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -45,6 +44,57 @@ public interface ProcedureStore {
void abortProcess();
}
/**
* An Iterator over a collection of Procedure
*/
public interface ProcedureIterator {
/**
* Reset the Iterator by seeking to the beginning of the list.
*/
void reset();
/**
* Returns true if the iterator has more elements.
* (In other words, returns true if next() would return a Procedure
* rather than throwing an exception.)
* @return true if the iterator has more procedures
*/
boolean hasNext();
/**
* Returns the next procedure in the iteration.
* @throws IOException if there was an error fetching/deserializing the procedure
* @throws NoSuchElementException if the iteration has no more elements
* @return the next procedure in the iteration.
*/
Procedure next() throws IOException;
}
/**
* Interface passed to the ProcedureStore.load() method to handle the store-load events.
*/
public interface ProcedureLoader {
/**
* Called by ProcedureStore.load() to notify about the maximum proc-id in the store.
* @param maxProcId the highest proc-id in the store
*/
void setMaxProcId(long maxProcId);
/**
* Called by the ProcedureStore.load() every time a set of procedures are ready to be executed.
* The ProcedureIterator passed to the method, has the procedure sorted in replay-order.
* @param procIter iterator over the procedures ready to be added to the executor.
*/
void load(ProcedureIterator procIter) throws IOException;
/**
* Called by the ProcedureStore.load() in case we have procedures not-ready to be added to
* the executor, which probably means they are corrupted since some information/link is missing.
* @param procIter iterator over the procedures not ready to be added to the executor, corrupted
*/
void handleCorrupted(ProcedureIterator procIter) throws IOException;
}
/**
* Add the listener to the notification list.
* @param listener The AssignmentListener to register
@ -87,9 +137,9 @@ public interface ProcedureStore {
/**
* Load the Procedures in the store.
* @return the set of procedures present in the store
* @param loader the ProcedureLoader that will handle the store-load events
*/
Iterator<Procedure> load() throws IOException;
void load(ProcedureLoader loader) throws IOException;
/**
* When a procedure is submitted to the executor insert(proc, null) will be called.

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
@ -63,14 +64,14 @@ public final class ProcedureWALFormat {
}
}
interface Loader {
interface Loader extends ProcedureLoader {
void removeLog(ProcedureWALFile log);
void markCorruptedWAL(ProcedureWALFile log, IOException e);
}
private ProcedureWALFormat() {}
public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs,
public static void load(final Iterator<ProcedureWALFile> logs,
final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
tracker.setKeepDeletes(true);
@ -84,14 +85,13 @@ public final class ProcedureWALFormat {
log.close();
}
}
reader.finalize(loader);
// The tracker is now updated with all the procedures read from the logs
tracker.setPartialFlag(false);
tracker.resetUpdates();
} finally {
tracker.setKeepDeletes(false);
}
// TODO: Write compacted version?
return reader.getProcedures();
}
public static void writeHeader(OutputStream stream, ProcedureWALHeader header)

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -30,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
@ -41,17 +39,74 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEn
public class ProcedureWALFormatReader {
private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
private final ProcedureStoreTracker tracker;
//private final long compactionLogId;
private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
private final Map<Long, ProcedureProtos.Procedure> localProcedures =
new HashMap<Long, ProcedureProtos.Procedure>();
// ==============================================================================================
// We read the WALs in reverse order. from the newest to the oldest.
// We have different entry types:
// - INIT: Procedure submitted by the user (also known as 'root procedure')
// - INSERT: Children added to the procedure <parentId>:[<childId>, ...]
// - UPDATE: The specified procedure was updated
// - DELETE: The procedure was removed (completed/rolledback and result TTL expired)
//
// In the WAL we can find multiple times the same procedure as UPDATE or INSERT.
// We read the WAL from top to bottom, so every time we find an entry of the
// same procedure, that will be the "latest" update.
//
// We keep two in-memory maps:
// - localProcedureMap: is the map containing the entries in the WAL we are processing
// - procedureMap: is the map containing all the procedures we found up to the WAL in process.
// localProcedureMap is merged with the procedureMap once we reach the WAL EOF.
//
// Since we are reading the WALs in reverse order (newest to oldest),
// if we find an entry related to a procedure we already have in 'procedureMap' we can discard it.
//
// The WAL is append-only so the last procedure in the WAL is the one that
// was in execution at the time we crashed/closed the server.
// given that, the procedure replay order can be inferred by the WAL order.
//
// Example:
// WAL-2: [A, B, A, C, D]
// WAL-1: [F, G, A, F, B]
// Replay-Order: [D, C, A, B, F, G]
//
// The "localProcedureMap" keeps a "replayOrder" list. Every time we add the
// record to the map that record is moved to the head of the "replayOrder" list.
// Using the example above:
// WAL-2 localProcedureMap.replayOrder is [D, C, A, B]
// WAL-1 localProcedureMap.replayOrder is [F, G]
//
// each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap'
// so using the example above we end up with: [D, C, A, B] + [F, G] as replay order.
//
// Fast Start: INIT/INSERT record and StackIDs
// ---------------------------------------------
// We have to special record, INIT and INSERT that tracks the first time
// the procedure was added to the WAL. We can use that information to be able
// to start procedures before reaching the end of the WAL, or before reading all the WALs.
// but in some cases the WAL with that record can be already gone.
// In alternative we can use the stackIds on each procedure,
// to identify when a procedure is ready to start.
// If there are gaps in the sum of the stackIds we need to read more WALs.
//
// Example (all procs child of A):
// WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5]
// WAL-1: [A, B, C, D]
//
// In the case above we need to read one more WAL to be able to consider
// the root procedure A and all children as ready.
// ==============================================================================================
private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
//private long compactionLogId;
private long maxProcId = 0;
private final ProcedureStoreTracker tracker;
private final boolean hasFastStartSupport;
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
this.tracker = tracker;
// we support fast-start only if we have a clean shutdown.
this.hasFastStartSupport = !tracker.isEmpty();
}
public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
@ -91,58 +146,65 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
if (localProcedures.isEmpty()) {
if (localProcedureMap.isEmpty()) {
LOG.info("No active entry found in state log " + log + ". removing it");
loader.removeLog(log);
} else {
Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
localProcedures.entrySet().iterator();
while (itd.hasNext()) {
Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
itd.remove();
procedureMap.mergeTail(localProcedureMap);
// Deserialize the procedure
Procedure proc = Procedure.convert(entry.getValue());
procedures.put(entry.getKey(), proc);
}
// TODO: Some procedure may be already runnables (see readInitEntry())
// (we can also check the "update map" in the log trackers)
//if (hasFastStartSupport) {
// TODO: Some procedure may be already runnables (see readInitEntry())
// (we can also check the "update map" in the log trackers)
// --------------------------------------------------
//EntryIterator iter = procedureMap.fetchReady();
//if (iter != null) loader.load(iter);
// --------------------------------------------------
//}
}
}
public Iterator<Procedure> getProcedures() {
return procedures.values().iterator();
public void finalize(ProcedureWALFormat.Loader loader) throws IOException {
// notify the loader about the max proc ID
loader.setMaxProcId(maxProcId);
// fetch the procedure ready to run.
ProcedureIterator procIter = procedureMap.fetchReady();
if (procIter != null) loader.load(procIter);
// remaining procedures have missing link or dependencies
// consider them as corrupted, manual fix is probably required.
procIter = procedureMap.fetchAll();
if (procIter != null) loader.handleCorrupted(procIter);
}
private void loadEntries(final ProcedureWALEntry entry) {
for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
maxProcId = Math.max(maxProcId, proc.getProcId());
if (isRequired(proc.getProcId())) {
if (LOG.isTraceEnabled()) {
LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
}
localProcedures.put(proc.getProcId(), proc);
tracker.setDeleted(proc.getProcId(), false);
private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) {
maxProcId = Math.max(maxProcId, proc.getProcId());
if (isRequired(proc.getProcId())) {
if (LOG.isTraceEnabled()) {
LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
}
localProcedureMap.add(proc);
tracker.setDeleted(proc.getProcId(), false);
}
}
private void readInitEntry(final ProcedureWALEntry entry)
throws IOException {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
// TODO: Make it runnable, before reading other files
loadEntries(entry);
loadProcedure(entry, entry.getProcedure(0));
}
private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
loadEntries(entry);
loadProcedure(entry, entry.getProcedure(0));
for (int i = 1; i < entry.getProcedureCount(); ++i) {
loadProcedure(entry, entry.getProcedure(i));
}
}
private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
loadEntries(entry);
loadProcedure(entry, entry.getProcedure(0));
}
private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
@ -152,7 +214,7 @@ public class ProcedureWALFormatReader {
LOG.trace("read delete entry " + entry.getProcId());
}
maxProcId = Math.max(maxProcId, entry.getProcId());
localProcedures.remove(entry.getProcId());
localProcedureMap.remove(entry.getProcId());
tracker.setDeleted(entry.getProcId(), true);
}
@ -161,6 +223,458 @@ public class ProcedureWALFormatReader {
}
private boolean isRequired(final long procId) {
return !isDeleted(procId) && !procedures.containsKey(procId);
return !isDeleted(procId) && !procedureMap.contains(procId);
}
// ==========================================================================
// We keep an in-memory map of the procedures sorted by replay order.
// (see the details in the beginning of the file)
// _______________________________________________
// procedureMap = | A | | E | | C | | | | | G | | |
// D B
// replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
//
// We also have a lazy grouping by "root procedure", and a list of
// unlinked procedure. If after reading all the WALs we have unlinked
// procedures it means that we had a missing WAL or a corruption.
// rootHead = A <-> D <-> G
// B E
// C
// unlinkFromLinkList = None
// ==========================================================================
private static class Entry {
// hash-table next
protected Entry hashNext;
// child head
protected Entry childHead;
// double-link for rootHead or childHead
protected Entry linkNext;
protected Entry linkPrev;
// replay double-linked-list
protected Entry replayNext;
protected Entry replayPrev;
// procedure-infos
protected Procedure procedure;
protected ProcedureProtos.Procedure proto;
protected boolean ready = false;
public Entry(Entry hashNext) { this.hashNext = hashNext; }
public long getProcId() { return proto.getProcId(); }
public long getParentId() { return proto.getParentId(); }
public boolean hasParent() { return proto.hasParentId(); }
public boolean isReady() { return ready; }
public Procedure convert() throws IOException {
if (procedure == null) {
procedure = Procedure.convert(proto);
}
return procedure;
}
@Override
public String toString() {
return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
}
}
private static class EntryIterator implements ProcedureIterator {
private final Entry replayHead;
private Entry current;
public EntryIterator(Entry replayHead) {
this.replayHead = replayHead;
this.current = replayHead;
}
@Override
public void reset() {
this.current = replayHead;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public Procedure next() throws IOException {
try {
return current.convert();
} finally {
current = current.replayNext;
}
}
}
private static class WalProcedureMap {
// procedure hash table
private Entry[] procedureMap;
// replay-order double-linked-list
private Entry replayOrderHead;
private Entry replayOrderTail;
// root linked-list
private Entry rootHead;
// pending unlinked children (root not present yet)
private Entry childUnlinkedHead;
public WalProcedureMap(int size) {
procedureMap = new Entry[size];
replayOrderHead = null;
replayOrderTail = null;
rootHead = null;
childUnlinkedHead = null;
}
public void add(ProcedureProtos.Procedure procProto) {
Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
boolean isNew = entry.proto == null;
entry.proto = procProto;
addToReplayList(entry);
if (isNew) {
if (procProto.hasParentId()) {
childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
} else {
rootHead = addToLinkList(entry, rootHead);
}
}
}
public boolean remove(long procId) {
Entry entry = removeFromMap(procId);
if (entry != null) {
unlinkFromReplayList(entry);
unlinkFromLinkList(entry);
return true;
}
return false;
}
public boolean contains(long procId) {
return getProcedure(procId) != null;
}
public boolean isEmpty() {
return replayOrderHead == null;
}
public void clear() {
for (int i = 0; i < procedureMap.length; ++i) {
procedureMap[i] = null;
}
replayOrderHead = null;
replayOrderTail = null;
rootHead = null;
childUnlinkedHead = null;
}
/*
* Merges two WalProcedureMap,
* the target is the "global" map, the source is the "local" map.
* - The entries in the hashtables are guaranteed to be unique.
* On replay we don't load procedures that already exist in the "global"
* map (the one we are merging the "local" in to).
* - The replayOrderList of the "local" nao will be appended to the "global"
* map replay list.
* - The "local" map will be cleared at the end of the operation.
*/
public void mergeTail(WalProcedureMap other) {
for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
int slotIndex = getMapSlot(p.getProcId());
p.hashNext = procedureMap[slotIndex];
procedureMap[slotIndex] = p;
}
if (replayOrderHead == null) {
replayOrderHead = other.replayOrderHead;
replayOrderTail = other.replayOrderTail;
rootHead = other.rootHead;
childUnlinkedHead = other.childUnlinkedHead;
} else {
// append replay list
assert replayOrderTail.replayNext == null;
assert other.replayOrderHead.replayPrev == null;
replayOrderTail.replayNext = other.replayOrderHead;
other.replayOrderHead.replayPrev = replayOrderTail;
replayOrderTail = other.replayOrderTail;
// merge rootHead
if (rootHead == null) {
rootHead = other.rootHead;
} else if (other.rootHead != null) {
Entry otherTail = findLinkListTail(other.rootHead);
otherTail.linkNext = rootHead;
rootHead.linkPrev = otherTail;
rootHead = other.rootHead;
}
// merge childUnlinkedHead
if (childUnlinkedHead == null) {
childUnlinkedHead = other.childUnlinkedHead;
} else if (other.childUnlinkedHead != null) {
Entry otherTail = findLinkListTail(other.childUnlinkedHead);
otherTail.linkNext = childUnlinkedHead;
childUnlinkedHead.linkPrev = otherTail;
childUnlinkedHead = other.childUnlinkedHead;
}
}
other.clear();
}
/*
* Returns an EntryIterator with the list of procedures ready
* to be added to the executor.
* A Procedure is ready if its children and parent are ready.
*/
public EntryIterator fetchReady() {
buildGraph();
Entry readyHead = null;
Entry readyTail = null;
Entry p = replayOrderHead;
while (p != null) {
Entry next = p.replayNext;
if (p.isReady()) {
unlinkFromReplayList(p);
if (readyTail != null) {
readyTail.replayNext = p;
p.replayPrev = readyTail;
} else {
p.replayPrev = null;
readyHead = p;
}
readyTail = p;
p.replayNext = null;
}
p = next;
}
// we need the hash-table lookups for parents, so this must be done
// out of the loop where we check isReadyToRun()
for (p = readyHead; p != null; p = p.replayNext) {
removeFromMap(p.getProcId());
unlinkFromLinkList(p);
}
return readyHead != null ? new EntryIterator(readyHead) : null;
}
/*
* Drain this map and return all procedures in it.
*/
public EntryIterator fetchAll() {
Entry head = replayOrderHead;
for (Entry p = head; p != null; p = p.replayNext) {
removeFromMap(p.getProcId());
}
for (int i = 0; i < procedureMap.length; ++i) {
assert procedureMap[i] == null : "map not empty i=" + i;
}
replayOrderHead = null;
replayOrderTail = null;
childUnlinkedHead = null;
rootHead = null;
return head != null ? new EntryIterator(head) : null;
}
private void buildGraph() {
Entry p = childUnlinkedHead;
while (p != null) {
Entry next = p.linkNext;
Entry rootProc = getRootProcedure(p);
if (rootProc != null) {
rootProc.childHead = addToLinkList(p, rootProc.childHead);
}
p = next;
}
for (p = rootHead; p != null; p = p.linkNext) {
checkReadyToRun(p);
}
}
private Entry getRootProcedure(Entry entry) {
while (entry != null && entry.hasParent()) {
entry = getProcedure(entry.getParentId());
}
return entry;
}
/*
* (see the comprehensive explaination in the beginning of the file)
* A Procedure is ready when parent and children are ready.
* "ready" means that we all the information that we need in-memory.
*
* Example-1:
* We have two WALs, we start reading fronm the newest (wal-2)
* wal-2 | C B |
* wal-1 | A B C |
*
* If C and B don't depend on A (A is not the parent), we can start them
* before reading wal-1. If B is the only one with parent A we can start C
* and read one more WAL before being able to start B.
*
* How do we know with the only information in B that we are not ready.
* - easy case, the parent is missing from the global map
* - more complex case we look at the Stack IDs
*
* The Stack-IDs are added to the procedure order as incremental index
* tracking how many times that procedure was executed, which is equivalent
* at the number of times we wrote the procedure to the WAL.
* In the example above:
* wal-2: B has stackId = [1, 2]
* wal-1: B has stackId = [1]
* wal-1: A has stackId = [0]
*
* Since we know that the Stack-IDs are incremental for a Procedure,
* we notice that there is a gap in the stackIds of B, so something was
* executed before.
* To identify when a Procedure is ready we do the sum of the stackIds of
* the procedure and the parent. if the stackIdSum is equals to the
* sum of {1..maxStackId} then everything we need is avaiable.
*
* Example-2
* wal-2 | A | A stackIds = [0, 2]
* wal-1 | A B | B stackIds = [1]
*
* There is a gap between A stackIds so something was executed in between.
*/
private boolean checkReadyToRun(Entry rootEntry) {
int stackIdSum = 0;
int maxStackId = 0;
for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
int stackId = 1 + rootEntry.proto.getStackId(i);
maxStackId = Math.max(maxStackId, stackId);
stackIdSum += stackId;
}
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
int stackId = 1 + p.proto.getStackId(i);
maxStackId = Math.max(maxStackId, stackId);
stackIdSum += stackId;
}
}
final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
if (cmpStackIdSum == stackIdSum) {
rootEntry.ready = true;
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
p.ready = true;
}
return true;
}
return false;
}
private void unlinkFromReplayList(Entry entry) {
if (replayOrderHead == entry) {
replayOrderHead = entry.replayNext;
}
if (replayOrderTail == entry) {
replayOrderTail = entry.replayPrev;
}
if (entry.replayPrev != null) {
entry.replayPrev.replayNext = entry.replayNext;
}
if (entry.replayNext != null) {
entry.replayNext.replayPrev = entry.replayPrev;
}
}
private void addToReplayList(final Entry entry) {
unlinkFromReplayList(entry);
entry.replayNext = replayOrderHead;
entry.replayPrev = null;
if (replayOrderHead != null) {
replayOrderHead.replayPrev = entry;
} else {
replayOrderTail = entry;
}
replayOrderHead = entry;
}
private void unlinkFromLinkList(Entry entry) {
if (entry == rootHead) {
rootHead = entry.linkNext;
} else if (entry == childUnlinkedHead) {
childUnlinkedHead = entry.linkNext;
}
if (entry.linkPrev != null) {
entry.linkPrev.linkNext = entry.linkNext;
}
if (entry.linkNext != null) {
entry.linkNext.linkPrev = entry.linkPrev;
}
}
private Entry addToLinkList(Entry entry, Entry linkHead) {
unlinkFromLinkList(entry);
entry.linkNext = linkHead;
entry.linkPrev = null;
if (linkHead != null) {
linkHead.linkPrev = entry;
}
return entry;
}
private Entry findLinkListTail(Entry linkHead) {
Entry tail = linkHead;
while (tail.linkNext != null) {
tail = tail.linkNext;
}
return tail;
}
private Entry addToMap(final long procId, final boolean hasParent) {
int slotIndex = getMapSlot(procId);
Entry entry = getProcedure(slotIndex, procId);
if (entry != null) return entry;
entry = new Entry(procedureMap[slotIndex]);
procedureMap[slotIndex] = entry;
return entry;
}
private Entry removeFromMap(final long procId) {
int slotIndex = getMapSlot(procId);
Entry prev = null;
Entry entry = procedureMap[slotIndex];
while (entry != null) {
if (procId == entry.getProcId()) {
if (prev != null) {
prev.hashNext = entry.hashNext;
} else {
procedureMap[slotIndex] = entry.hashNext;
}
entry.hashNext = null;
return entry;
}
prev = entry;
entry = entry.hashNext;
}
return null;
}
private Entry getProcedure(final long procId) {
return getProcedure(getMapSlot(procId), procId);
}
private Entry getProcedure(final int slotIndex, final long procId) {
Entry entry = procedureMap[slotIndex];
while (entry != null) {
if (procId == entry.getProcId()) {
return entry;
}
entry = entry.hashNext;
}
return null;
}
private int getMapSlot(final long procId) {
return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length);
}
}
}

View File

@ -215,13 +215,12 @@ public class WALProcedureStore implements ProcedureStore {
FileStatus[] oldLogs = getLogFiles();
while (running.get()) {
// Get Log-MaxID and recover lease on old logs
flushLogId = initOldLogs(oldLogs) + 1;
flushLogId = initOldLogs(oldLogs);
// Create new state-log
if (!rollWriter(flushLogId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Someone else has already created log: " + flushLogId);
}
if (!rollWriter(flushLogId + 1)) {
// someone else has already created this log
LOG.debug("someone else has already created log " + flushLogId);
continue;
}
@ -241,7 +240,7 @@ public class WALProcedureStore implements ProcedureStore {
}
@Override
public Iterator<Procedure> load() throws IOException {
public void load(final ProcedureLoader loader) throws IOException {
if (logs.isEmpty()) {
throw new RuntimeException("recoverLease() must be called before loading data");
}
@ -251,7 +250,8 @@ public class WALProcedureStore implements ProcedureStore {
if (LOG.isDebugEnabled()) {
LOG.debug("No state logs to replay.");
}
return null;
loader.setMaxProcId(0);
return;
}
// Load the old logs
@ -259,7 +259,22 @@ public class WALProcedureStore implements ProcedureStore {
Iterator<ProcedureWALFile> it = logs.descendingIterator();
it.next(); // Skip the current log
try {
return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
@Override
public void setMaxProcId(long maxProcId) {
loader.setMaxProcId(maxProcId);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
loader.load(procIter);
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
loader.handleCorrupted(procIter);
}
@Override
public void removeLog(ProcedureWALFile log) {
toRemove.add(log);
@ -301,7 +316,7 @@ public class WALProcedureStore implements ProcedureStore {
}
// Push the transaction data and wait until it is persisted
logId = pushData(slot);
pushData(slot);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
@ -383,7 +398,7 @@ public class WALProcedureStore implements ProcedureStore {
storeTracker.delete(procId);
if (logId == flushLogId) {
if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
removeOldLogs = rollWriterOrDie(logId + 1);
removeOldLogs = rollWriterOrDie();
}
}
}
@ -541,9 +556,9 @@ public class WALProcedureStore implements ProcedureStore {
}
}
private boolean rollWriterOrDie(final long logId) {
private boolean rollWriterOrDie() {
try {
return rollWriter(logId);
return rollWriter();
} catch (IOException e) {
LOG.warn("Unable to roll the log", e);
sendAbortProcessSignal();
@ -551,7 +566,13 @@ public class WALProcedureStore implements ProcedureStore {
}
}
protected boolean rollWriter() throws IOException {
return rollWriter(flushLogId + 1);
}
private boolean rollWriter(final long logId) throws IOException {
assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
.setVersion(ProcedureWALFormat.HEADER_VERSION)
.setType(ProcedureWALFormat.LOG_TYPE_STREAM)

View File

@ -57,11 +57,11 @@ public class ProcedureTestingUtility {
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
throws Exception {
restart(procExecutor, null);
restart(procExecutor, null, true);
}
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
Runnable beforeStartAction) throws Exception {
Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
ProcedureStore procStore = procExecutor.getStore();
int storeThreads = procExecutor.getNumThreads();
int execThreads = procExecutor.getNumThreads();
@ -75,7 +75,7 @@ public class ProcedureTestingUtility {
}
// re-start
procStore.start(storeThreads);
procExecutor.start(execThreads);
procExecutor.start(execThreads, failOnCorrupted);
}
public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,

View File

@ -71,7 +71,7 @@ public class TestProcedureExecution {
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
}
@After

View File

@ -75,7 +75,7 @@ public class TestProcedureRecovery {
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
procSleepInterval = 0;
}

View File

@ -19,13 +19,17 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -46,7 +50,7 @@ import static org.junit.Assert.fail;
public class TestProcedureReplayOrder {
private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
private static final Procedure NULL_PROC = null;
private static final int NUM_THREADS = 16;
private ProcedureExecutor<Void> procExecutor;
private TestProcedureEnv procEnv;
@ -60,7 +64,7 @@ public class TestProcedureReplayOrder {
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10);
htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 25);
testDir = htu.getDataTestDir();
fs = testDir.getFileSystem(htu.getConfiguration());
@ -70,8 +74,8 @@ public class TestProcedureReplayOrder {
procEnv = new TestProcedureEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procStore.start(24);
procExecutor.start(1);
procStore.start(NUM_THREADS);
procExecutor.start(1, true);
}
@After
@ -82,47 +86,45 @@ public class TestProcedureReplayOrder {
}
@Test(timeout=90000)
public void testSingleStepReplyOrder() throws Exception {
// avoid the procedure to be runnable
procEnv.setAcquireLock(false);
public void testSingleStepReplayOrder() throws Exception {
final int NUM_PROC_XTHREAD = 32;
final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
// submit the procedures
submitProcedures(16, 25, TestSingleStepProcedure.class);
submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
while (procEnv.getExecId() < NUM_PROCS) {
Thread.sleep(100);
}
// restart the executor and allow the procedures to run
ProcedureTestingUtility.restart(procExecutor, new Runnable() {
@Override
public void run() {
procEnv.setAcquireLock(true);
}
});
ProcedureTestingUtility.restart(procExecutor);
// wait the execution of all the procedures and
// assert that the execution order was sorted by procId
ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
procEnv.assertSortedExecList();
// TODO: FIXME: This should be revisited
procEnv.assertSortedExecList(NUM_PROCS);
}
@Ignore
@Test(timeout=90000)
public void testMultiStepReplyOrder() throws Exception {
// avoid the procedure to be runnable
procEnv.setAcquireLock(false);
public void testMultiStepReplayOrder() throws Exception {
final int NUM_PROC_XTHREAD = 24;
final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
// submit the procedures
submitProcedures(16, 10, TestTwoStepProcedure.class);
submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
while (procEnv.getExecId() < NUM_PROCS) {
Thread.sleep(100);
}
// restart the executor and allow the procedures to run
ProcedureTestingUtility.restart(procExecutor, new Runnable() {
@Override
public void run() {
procEnv.setAcquireLock(true);
}
});
ProcedureTestingUtility.restart(procExecutor);
fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER");
// wait the execution of all the procedures and
// assert that the execution order was sorted by procId
ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
procEnv.assertSortedExecList(NUM_PROCS);
}
private void submitProcedures(final int nthreads, final int nprocPerThread,
@ -154,73 +156,101 @@ public class TestProcedureReplayOrder {
}
private static class TestProcedureEnv {
private ArrayList<Long> execList = new ArrayList<Long>();
private boolean acquireLock = true;
private ArrayList<TestProcedure> execList = new ArrayList<TestProcedure>();
private AtomicLong execTimestamp = new AtomicLong(0);
public void setAcquireLock(boolean acquireLock) {
this.acquireLock = acquireLock;
public long getExecId() {
return execTimestamp.get();
}
public boolean canAcquireLock() {
return acquireLock;
public long nextExecId() {
return execTimestamp.incrementAndGet();
}
public void addToExecList(final Procedure proc) {
execList.add(proc.getProcId());
public void addToExecList(final TestProcedure proc) {
execList.add(proc);
}
public ArrayList<Long> getExecList() {
return execList;
}
public void assertSortedExecList() {
public void assertSortedExecList(int numProcs) {
assertEquals(numProcs, execList.size());
LOG.debug("EXEC LIST: " + execList);
for (int i = 1; i < execList.size(); ++i) {
assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i),
execList.get(i-1) < execList.get(i));
for (int i = 0; i < execList.size() - 1; ++i) {
TestProcedure a = execList.get(i);
TestProcedure b = execList.get(i + 1);
assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
}
}
}
public static class TestSingleStepProcedure extends SequentialProcedure<TestProcedureEnv> {
public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
protected long execId = 0;
protected int step = 0;
public long getExecId() {
return execId;
}
@Override
protected void rollback(TestProcedureEnv env) { }
@Override
protected boolean abort(TestProcedureEnv env) { return true; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException {
StreamUtils.writeLong(stream, execId);
}
@Override
protected void deserializeStateData(final InputStream stream) throws IOException {
execId = StreamUtils.readLong(stream);
step = 2;
}
}
public static class TestSingleStepProcedure extends TestProcedure {
public TestSingleStepProcedure() { }
@Override
protected Procedure[] execute(TestProcedureEnv env) {
LOG.debug("execute procedure " + this);
env.addToExecList(this);
return null;
}
protected boolean acquireLock(final TestProcedureEnv env) {
return env.canAcquireLock();
protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
LOG.trace("execute procedure step=" + step + ": " + this);
if (step == 0) {
step = 1;
execId = env.nextExecId();
return new Procedure[] { this };
} else if (step == 2) {
env.addToExecList(this);
return null;
}
throw new ProcedureYieldException();
}
@Override
protected void rollback(TestProcedureEnv env) { }
@Override
protected boolean abort(TestProcedureEnv env) { return true; }
public String toString() {
return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
}
}
public static class TestTwoStepProcedure extends SequentialProcedure<TestProcedureEnv> {
public static class TestTwoStepProcedure extends TestProcedure {
public TestTwoStepProcedure() { }
@Override
protected Procedure[] execute(TestProcedureEnv env) {
LOG.debug("execute procedure " + this);
env.addToExecList(this);
return new Procedure[] { new TestSingleStepProcedure() };
}
protected boolean acquireLock(final TestProcedureEnv env) {
return true;
protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
LOG.trace("execute procedure step=" + step + ": " + this);
if (step == 0) {
step = 1;
execId = env.nextExecId();
return new Procedure[] { new TestSingleStepProcedure() };
} else if (step == 2) {
env.addToExecList(this);
return null;
}
throw new ProcedureYieldException();
}
@Override
protected void rollback(TestProcedureEnv env) { }
@Override
protected boolean abort(TestProcedureEnv env) { return true; }
public String toString() {
return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
}
}
}

View File

@ -22,6 +22,10 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.HashSet;
import java.util.Set;
@ -35,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -84,17 +90,20 @@ public class TestWALProcedureStore {
fs.delete(logDir, true);
}
private Iterator<Procedure> storeRestart() throws Exception {
private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
procStore.stop(false);
procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease();
return procStore.load();
procStore.load(loader);
}
@Test
public void testEmptyLogLoad() throws Exception {
Iterator<Procedure> loader = storeRestart();
assertEquals(0, countProcedures(loader));
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(0, loader.getMaxProcId());
assertEquals(0, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
}
@Test
@ -153,8 +162,10 @@ public class TestWALProcedureStore {
assertEquals(1, logs.length);
corruptLog(logs[0], 4);
int count = countProcedures(storeRestart());
assertEquals(100, count);
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(100, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
}
@Test
@ -173,10 +184,205 @@ public class TestWALProcedureStore {
assertEquals(1, logs.length);
corruptLog(logs[0], 1823);
int count = countProcedures(storeRestart());
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertTrue(procStore.getCorruptedLogs() != null);
assertEquals(1, procStore.getCorruptedLogs().size());
assertEquals(85, count);
assertEquals(85, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
}
@Test
public void testCorruptedProcedures() throws Exception {
// Insert root-procedures
TestProcedure[] rootProcs = new TestProcedure[10];
for (int i = 1; i <= rootProcs.length; i++) {
rootProcs[i-1] = new TestProcedure(i, 0);
procStore.insert(rootProcs[i-1], null);
rootProcs[i-1].addStackId(0);
procStore.update(rootProcs[i-1]);
}
// insert root-child txn
procStore.rollWriter();
for (int i = 1; i <= rootProcs.length; i++) {
TestProcedure b = new TestProcedure(rootProcs.length + i, i);
rootProcs[i-1].addStackId(1);
procStore.insert(rootProcs[i-1], new Procedure[] { b });
}
// insert child updates
procStore.rollWriter();
for (int i = 1; i <= rootProcs.length; i++) {
procStore.update(new TestProcedure(rootProcs.length + i, i));
}
// Stop the store
procStore.stop(false);
// Remove 4 byte from the trailer
FileStatus[] logs = fs.listStatus(logDir);
assertEquals(3, logs.length);
Arrays.sort(logs, new Comparator<FileStatus>() {
@Override
public int compare(FileStatus o1, FileStatus o2) {
return o1.getPath().getName().compareTo(o2.getPath().getName());
}
});
// Remove the first log, we have insert-txn and updates in the others so everything is fine.
fs.delete(logs[0].getPath(), false);
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(rootProcs.length * 2, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
// Remove the second log, we have lost any root/parent references
fs.delete(logs[1].getPath(), false);
loader.reset();
storeRestart(loader);
assertEquals(0, loader.getLoadedCount());
assertEquals(rootProcs.length, loader.getCorruptedCount());
for (Procedure proc: loader.getCorrupted()) {
assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
assertTrue(proc.toString(),
proc.getProcId() > rootProcs.length &&
proc.getProcId() <= (rootProcs.length * 2));
}
}
@Test(timeout=60000)
public void testWalReplayOrder_AB_A() throws Exception {
/*
* | A B | -> | A |
*/
TestProcedure a = new TestProcedure(1, 0);
TestProcedure b = new TestProcedure(2, 1);
procStore.insert(a, null);
a.addStackId(0);
procStore.update(a);
procStore.insert(a, new Procedure[] { b });
b.addStackId(1);
procStore.update(b);
procStore.rollWriter();
a.addStackId(2);
procStore.update(a);
storeRestart(new ProcedureStore.ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
assertEquals(2, maxProcId);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
assertTrue(procIter.hasNext());
assertEquals(1, procIter.next().getProcId());
assertTrue(procIter.hasNext());
assertEquals(2, procIter.next().getProcId());
assertFalse(procIter.hasNext());
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
assertFalse(procIter.hasNext());
}
});
}
@Test(timeout=60000)
public void testWalReplayOrder_ABC_BAD() throws Exception {
/*
* | A B C | -> | B A D |
*/
TestProcedure a = new TestProcedure(1, 0);
TestProcedure b = new TestProcedure(2, 1);
TestProcedure c = new TestProcedure(3, 2);
TestProcedure d = new TestProcedure(4, 0);
procStore.insert(a, null);
a.addStackId(0);
procStore.update(a);
procStore.insert(a, new Procedure[] { b });
b.addStackId(1);
procStore.update(b);
procStore.insert(b, new Procedure[] { c });
b.addStackId(2);
procStore.update(b);
procStore.rollWriter();
b.addStackId(3);
procStore.update(b);
a.addStackId(4);
procStore.update(a);
procStore.insert(d, null);
d.addStackId(0);
procStore.update(d);
storeRestart(new ProcedureStore.ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
assertEquals(4, maxProcId);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
assertTrue(procIter.hasNext());
assertEquals(4, procIter.next().getProcId());
// TODO: This will be multiple call once we do fast-start
//assertFalse(procIter.hasNext());
assertTrue(procIter.hasNext());
assertEquals(1, procIter.next().getProcId());
assertTrue(procIter.hasNext());
assertEquals(2, procIter.next().getProcId());
assertTrue(procIter.hasNext());
assertEquals(3, procIter.next().getProcId());
assertFalse(procIter.hasNext());
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
assertFalse(procIter.hasNext());
}
});
}
public static class TestProcedure extends Procedure<Void> {
public TestProcedure() {}
public TestProcedure(long procId, long parentId) {
setProcId(procId);
if (parentId > 0) {
setParentProcId(parentId);
}
}
public void addStackId(final int index) {
addStackIndex(index);
}
@Override
protected Procedure[] execute(Void env) { return null; }
@Override
protected void rollback(Void env) { }
@Override
protected boolean abort(Void env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException { }
@Override
protected void deserializeStateData(final InputStream stream) throws IOException { }
}
private void corruptLog(final FileStatus logFile, final long dropBytes)
@ -192,29 +398,11 @@ public class TestWALProcedureStore {
}
private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
int count = 0;
Iterator<Procedure> loader = storeRestart();
while (loader.hasNext()) {
Procedure proc = loader.next();
LOG.debug("loading procId=" + proc.getProcId());
assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId()));
count++;
}
assertEquals(procIds.size(), count);
}
private void assertIsEmpty(Iterator<Procedure> iterator) {
assertEquals(0, countProcedures(iterator));
}
private int countProcedures(Iterator<Procedure> iterator) {
int count = 0;
while (iterator.hasNext()) {
Procedure proc = iterator.next();
LOG.trace("loading procId=" + proc.getProcId());
count++;
}
return count;
LOG.debug("expected: " + procIds);
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(procIds.size(), loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
}
private void assertEmptyLogDir() {
@ -264,4 +452,78 @@ public class TestWALProcedureStore {
}
}
}
private class LoadCounter implements ProcedureStore.ProcedureLoader {
private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
private Set<Long> procIds;
private long maxProcId = 0;
public LoadCounter() {
this(null);
}
public LoadCounter(final Set<Long> procIds) {
this.procIds = procIds;
}
public void reset() {
reset(null);
}
public void reset(final Set<Long> procIds) {
corrupted.clear();
loaded.clear();
this.procIds = procIds;
this.maxProcId = 0;
}
public long getMaxProcId() {
return maxProcId;
}
public ArrayList<Procedure> getLoaded() {
return loaded;
}
public int getLoadedCount() {
return loaded.size();
}
public ArrayList<Procedure> getCorrupted() {
return corrupted;
}
public int getCorruptedCount() {
return corrupted.size();
}
@Override
public void setMaxProcId(long maxProcId) {
maxProcId = maxProcId;
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
Procedure proc = procIter.next();
LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
if (procIds != null) {
assertTrue("procId=" + proc.getProcId() + " unexpected",
procIds.contains(proc.getProcId()));
}
loaded.add(proc);
}
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
Procedure proc = procIter.next();
LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
corrupted.add(proc);
}
}
}
}

View File

@ -1102,8 +1102,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
Math.max(Runtime.getRuntime().availableProcessors(),
MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
final boolean abortOnCorruption = conf.getBoolean(
MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
procedureStore.start(numThreads);
procedureExecutor.start(numThreads);
procedureExecutor.start(numThreads, abortOnCorruption);
}
private void stopProcedureExecutor() {

View File

@ -24,8 +24,21 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public final class MasterProcedureConstants {
private MasterProcedureConstants() {}
/** Used to construct the name of the log directory for master procedures */
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
/** Number of threads used by the procedure executor */
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4;
/**
* Procedure replay sanity check. In case a WAL is missing or unreadable we
* may lose information about pending/running procedures.
* Set this to true in case you want the Master failing on load if a corrupted
* procedure is encountred.
* (Default is off, because we prefer having the Master up and running and
* fix the "in transition" state "by hand")
*/
public static final String EXECUTOR_ABORT_ON_CORRUPTION = "hbase.procedure.abort.on.corruption";
public static final boolean DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION = false;
}

View File

@ -147,4 +147,11 @@
<description>Skip sanity checks in tests
</description>
</property>
<property>
<name>hbase.procedure.fail.on.corruption</name>
<value>true</value>
<description>
Enable replay sanity checks on procedure tests.
</description>
</property>
</configuration>