diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 338fcad4c94..6abf2c5d6e8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -519,6 +519,20 @@ public abstract class Procedure implements Comparable { return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } + /** + * Get an hashcode for the specified Procedure ID + * @return the hashcode for the specified procId + */ + public static long getProcIdHashCode(final long procId) { + long h = procId; + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + return h; + } + /* * Helper to lookup the root Procedure ID given a specified procedure. */ diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 6e879971714..59b346ae23a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.HashSet; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue; import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; @@ -264,45 +264,70 @@ public class ProcedureExecutor { this.conf = conf; } - private List> load() throws IOException { + private void load(final boolean abortOnCorruption) throws IOException { Preconditions.checkArgument(completed.isEmpty()); Preconditions.checkArgument(rollbackStack.isEmpty()); Preconditions.checkArgument(procedures.isEmpty()); Preconditions.checkArgument(waitingTimeout.isEmpty()); Preconditions.checkArgument(runnables.size() == 0); - // 1. Load the procedures - Iterator loader = store.load(); - if (loader == null) { - lastProcId.set(0); - return null; - } - - long logMaxProcId = 0; - int runnablesCount = 0; - while (loader.hasNext()) { - Procedure proc = loader.next(); - proc.beforeReplay(getEnvironment()); - procedures.put(proc.getProcId(), proc); - logMaxProcId = Math.max(logMaxProcId, proc.getProcId()); - if (LOG.isDebugEnabled()) { - LOG.debug("Loading procedure state=" + proc.getState() + - " isFailed=" + proc.hasException() + ": " + proc); + store.load(new ProcedureStore.ProcedureLoader() { + @Override + public void setMaxProcId(long maxProcId) { + assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; + LOG.debug("load procedures maxProcId=" + maxProcId); + lastProcId.set(maxProcId); } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + loadProcedures(procIter, abortOnCorruption); + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + int corruptedCount = 0; + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + LOG.error("corrupted procedure: " + proc); + corruptedCount++; + } + if (abortOnCorruption && corruptedCount > 0) { + throw new IOException("found " + corruptedCount + " procedures on replay"); + } + } + }); + } + + private void loadProcedures(final ProcedureIterator procIter, + final boolean abortOnCorruption) throws IOException { + // 1. Build the rollback stack + int runnablesCount = 0; + while (procIter.hasNext()) { + Procedure proc = procIter.next(); if (!proc.hasParent() && !proc.isFinished()) { rollbackStack.put(proc.getProcId(), new RootProcedureState()); } + // add the procedure to the map + proc.beforeReplay(getEnvironment()); + procedures.put(proc.getProcId(), proc); + if (proc.getState() == ProcedureState.RUNNABLE) { runnablesCount++; } } - assert lastProcId.get() < 0; - lastProcId.set(logMaxProcId); // 2. Initialize the stacks - TreeSet runnableSet = null; + ArrayList runnableList = new ArrayList(runnablesCount); HashSet waitingSet = null; - for (final Procedure proc: procedures.values()) { + procIter.reset(); + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s", + proc.getState(), proc.hasException(), proc)); + } + Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback? @@ -312,10 +337,11 @@ public class ProcedureExecutor { if (!proc.hasParent() && proc.isFinished()) { if (LOG.isDebugEnabled()) { - LOG.debug("The procedure is completed state=" + proc.getState() + - " isFailed=" + proc.hasException() + ": " + proc); + LOG.debug(String.format("The procedure is completed state=%s isFailed=%s", + proc.getState(), proc.hasException())); } assert !rollbackStack.containsKey(proc.getProcId()); + procedures.remove(proc.getProcId()); completed.put(proc.getProcId(), newResultFromProcedure(proc)); continue; } @@ -333,10 +359,7 @@ public class ProcedureExecutor { switch (proc.getState()) { case RUNNABLE: - if (runnableSet == null) { - runnableSet = new TreeSet(); - } - runnableSet.add(proc); + runnableList.add(proc); break; case WAITING_TIMEOUT: if (waitingSet == null) { @@ -361,7 +384,7 @@ public class ProcedureExecutor { } // 3. Validate the stacks - List> corrupted = null; + int corruptedCount = 0; Iterator> itStack = rollbackStack.entrySet().iterator(); while (itStack.hasNext()) { Map.Entry entry = itStack.next(); @@ -369,32 +392,49 @@ public class ProcedureExecutor { if (procStack.isValid()) continue; for (Procedure proc: procStack.getSubprocedures()) { + LOG.error("corrupted procedure: " + proc); procedures.remove(proc.getProcId()); - if (runnableSet != null) runnableSet.remove(proc); + runnableList.remove(proc); if (waitingSet != null) waitingSet.remove(proc); + corruptedCount++; } itStack.remove(); - if (corrupted == null) { - corrupted = new ArrayList>(); - } - corrupted.add(entry); + } + + if (abortOnCorruption && corruptedCount > 0) { + throw new IOException("found " + corruptedCount + " procedures on replay"); } // 4. Push the runnables - if (runnableSet != null) { - // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure - // may be started way before this stuff. - for (Procedure proc: runnableSet) { + if (!runnableList.isEmpty()) { + // TODO: See ProcedureWALFormatReader#hasFastStartSupport + // some procedure may be started way before this stuff. + for (int i = runnableList.size() - 1; i >= 0; --i) { + Procedure proc = runnableList.get(i); if (!proc.hasParent()) { sendProcedureLoadedNotification(proc.getProcId()); } - runnables.addBack(proc); + if (proc.wasExecuted()) { + runnables.addFront(proc); + } else { + // if it was not in execution, it can wait. + runnables.addBack(proc); + } } } - return corrupted; } - public void start(int numThreads) throws IOException { + /** + * Start the procedure executor. + * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to + * recover the lease, and ensure a single executor, and start the procedure + * replay to resume and recover the previous pending and in-progress perocedures. + * + * @param numThreads number of threads available for procedure execution. + * @param abortOnCorruption true if you want to abort your service in case + * a corrupted procedure is found on replay. otherwise false. + */ + public void start(int numThreads, boolean abortOnCorruption) throws IOException { if (running.getAndSet(true)) { LOG.warn("Already running"); return; @@ -427,11 +467,11 @@ public class ProcedureExecutor { store.recoverLease(); // TODO: Split in two steps. - // TODO: Handle corrupted procedure returned (probably just a WARN) + // TODO: Handle corrupted procedures (currently just a warn) // The first one will make sure that we have the latest id, // so we can start the threads and accept new procedures. // The second step will do the actual load of old procedures. - load(); + load(abortOnCorruption); // Start the executors. Here we must have the lastProcId set. for (int i = 0; i < threads.length; ++i) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 06bfa44e476..a05c11571dc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; -import java.util.Iterator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -45,6 +44,57 @@ public interface ProcedureStore { void abortProcess(); } + /** + * An Iterator over a collection of Procedure + */ + public interface ProcedureIterator { + /** + * Reset the Iterator by seeking to the beginning of the list. + */ + void reset(); + + /** + * Returns true if the iterator has more elements. + * (In other words, returns true if next() would return a Procedure + * rather than throwing an exception.) + * @return true if the iterator has more procedures + */ + boolean hasNext(); + + /** + * Returns the next procedure in the iteration. + * @throws IOException if there was an error fetching/deserializing the procedure + * @throws NoSuchElementException if the iteration has no more elements + * @return the next procedure in the iteration. + */ + Procedure next() throws IOException; + } + + /** + * Interface passed to the ProcedureStore.load() method to handle the store-load events. + */ + public interface ProcedureLoader { + /** + * Called by ProcedureStore.load() to notify about the maximum proc-id in the store. + * @param maxProcId the highest proc-id in the store + */ + void setMaxProcId(long maxProcId); + + /** + * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed. + * The ProcedureIterator passed to the method, has the procedure sorted in replay-order. + * @param procIter iterator over the procedures ready to be added to the executor. + */ + void load(ProcedureIterator procIter) throws IOException; + + /** + * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to + * the executor, which probably means they are corrupted since some information/link is missing. + * @param procIter iterator over the procedures not ready to be added to the executor, corrupted + */ + void handleCorrupted(ProcedureIterator procIter) throws IOException; + } + /** * Add the listener to the notification list. * @param listener The AssignmentListener to register @@ -87,9 +137,9 @@ public interface ProcedureStore { /** * Load the Procedures in the store. - * @return the set of procedures present in the store + * @param loader the ProcedureLoader that will handle the store-load events */ - Iterator load() throws IOException; + void load(ProcedureLoader loader) throws IOException; /** * When a procedure is submitted to the executor insert(proc, null) will be called. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index 17432ac24c9..c75c1414e91 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; @@ -63,14 +64,14 @@ public final class ProcedureWALFormat { } } - interface Loader { + interface Loader extends ProcedureLoader { void removeLog(ProcedureWALFile log); void markCorruptedWAL(ProcedureWALFile log, IOException e); } private ProcedureWALFormat() {} - public static Iterator load(final Iterator logs, + public static void load(final Iterator logs, final ProcedureStoreTracker tracker, final Loader loader) throws IOException { ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker); tracker.setKeepDeletes(true); @@ -84,14 +85,13 @@ public final class ProcedureWALFormat { log.close(); } } + reader.finalize(loader); // The tracker is now updated with all the procedures read from the logs tracker.setPartialFlag(false); tracker.resetUpdates(); } finally { tracker.setKeepDeletes(false); } - // TODO: Write compacted version? - return reader.getProcedures(); } public static void writeHeader(OutputStream stream, ProcedureWALHeader header) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index a60b8f5d88b..76c05542e40 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; @@ -41,17 +39,74 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEn public class ProcedureWALFormatReader { private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class); - private final ProcedureStoreTracker tracker; - //private final long compactionLogId; - - private final Map procedures = new HashMap(); - private final Map localProcedures = - new HashMap(); + // ============================================================================================== + // We read the WALs in reverse order. from the newest to the oldest. + // We have different entry types: + // - INIT: Procedure submitted by the user (also known as 'root procedure') + // - INSERT: Children added to the procedure :[, ...] + // - UPDATE: The specified procedure was updated + // - DELETE: The procedure was removed (completed/rolledback and result TTL expired) + // + // In the WAL we can find multiple times the same procedure as UPDATE or INSERT. + // We read the WAL from top to bottom, so every time we find an entry of the + // same procedure, that will be the "latest" update. + // + // We keep two in-memory maps: + // - localProcedureMap: is the map containing the entries in the WAL we are processing + // - procedureMap: is the map containing all the procedures we found up to the WAL in process. + // localProcedureMap is merged with the procedureMap once we reach the WAL EOF. + // + // Since we are reading the WALs in reverse order (newest to oldest), + // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it. + // + // The WAL is append-only so the last procedure in the WAL is the one that + // was in execution at the time we crashed/closed the server. + // given that, the procedure replay order can be inferred by the WAL order. + // + // Example: + // WAL-2: [A, B, A, C, D] + // WAL-1: [F, G, A, F, B] + // Replay-Order: [D, C, A, B, F, G] + // + // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the + // record to the map that record is moved to the head of the "replayOrder" list. + // Using the example above: + // WAL-2 localProcedureMap.replayOrder is [D, C, A, B] + // WAL-1 localProcedureMap.replayOrder is [F, G] + // + // each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap' + // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order. + // + // Fast Start: INIT/INSERT record and StackIDs + // --------------------------------------------- + // We have to special record, INIT and INSERT that tracks the first time + // the procedure was added to the WAL. We can use that information to be able + // to start procedures before reaching the end of the WAL, or before reading all the WALs. + // but in some cases the WAL with that record can be already gone. + // In alternative we can use the stackIds on each procedure, + // to identify when a procedure is ready to start. + // If there are gaps in the sum of the stackIds we need to read more WALs. + // + // Example (all procs child of A): + // WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5] + // WAL-1: [A, B, C, D] + // + // In the case above we need to read one more WAL to be able to consider + // the root procedure A and all children as ready. + // ============================================================================================== + private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); + private final WalProcedureMap procedureMap = new WalProcedureMap(1024); + //private long compactionLogId; private long maxProcId = 0; + private final ProcedureStoreTracker tracker; + private final boolean hasFastStartSupport; + public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) { this.tracker = tracker; + // we support fast-start only if we have a clean shutdown. + this.hasFastStartSupport = !tracker.isEmpty(); } public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException { @@ -91,58 +146,65 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } - if (localProcedures.isEmpty()) { + if (localProcedureMap.isEmpty()) { LOG.info("No active entry found in state log " + log + ". removing it"); loader.removeLog(log); } else { - Iterator> itd = - localProcedures.entrySet().iterator(); - while (itd.hasNext()) { - Map.Entry entry = itd.next(); - itd.remove(); + procedureMap.mergeTail(localProcedureMap); - // Deserialize the procedure - Procedure proc = Procedure.convert(entry.getValue()); - procedures.put(entry.getKey(), proc); - } - - // TODO: Some procedure may be already runnables (see readInitEntry()) - // (we can also check the "update map" in the log trackers) + //if (hasFastStartSupport) { + // TODO: Some procedure may be already runnables (see readInitEntry()) + // (we can also check the "update map" in the log trackers) + // -------------------------------------------------- + //EntryIterator iter = procedureMap.fetchReady(); + //if (iter != null) loader.load(iter); + // -------------------------------------------------- + //} } } - public Iterator getProcedures() { - return procedures.values().iterator(); + public void finalize(ProcedureWALFormat.Loader loader) throws IOException { + // notify the loader about the max proc ID + loader.setMaxProcId(maxProcId); + + // fetch the procedure ready to run. + ProcedureIterator procIter = procedureMap.fetchReady(); + if (procIter != null) loader.load(procIter); + + // remaining procedures have missing link or dependencies + // consider them as corrupted, manual fix is probably required. + procIter = procedureMap.fetchAll(); + if (procIter != null) loader.handleCorrupted(procIter); } - private void loadEntries(final ProcedureWALEntry entry) { - for (ProcedureProtos.Procedure proc: entry.getProcedureList()) { - maxProcId = Math.max(maxProcId, proc.getProcId()); - if (isRequired(proc.getProcId())) { - if (LOG.isTraceEnabled()) { - LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); - } - localProcedures.put(proc.getProcId(), proc); - tracker.setDeleted(proc.getProcId(), false); + private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { + maxProcId = Math.max(maxProcId, proc.getProcId()); + if (isRequired(proc.getProcId())) { + if (LOG.isTraceEnabled()) { + LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); } + localProcedureMap.add(proc); + tracker.setDeleted(proc.getProcId(), false); } } private void readInitEntry(final ProcedureWALEntry entry) throws IOException { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; - // TODO: Make it runnable, before reading other files - loadEntries(entry); + loadProcedure(entry, entry.getProcedure(0)); } private void readInsertEntry(final ProcedureWALEntry entry) throws IOException { assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; - loadEntries(entry); + loadProcedure(entry, entry.getProcedure(0)); + for (int i = 1; i < entry.getProcedureCount(); ++i) { + loadProcedure(entry, entry.getProcedure(i)); + } } private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; - loadEntries(entry); + loadProcedure(entry, entry.getProcedure(0)); } private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { @@ -152,7 +214,7 @@ public class ProcedureWALFormatReader { LOG.trace("read delete entry " + entry.getProcId()); } maxProcId = Math.max(maxProcId, entry.getProcId()); - localProcedures.remove(entry.getProcId()); + localProcedureMap.remove(entry.getProcId()); tracker.setDeleted(entry.getProcId(), true); } @@ -161,6 +223,458 @@ public class ProcedureWALFormatReader { } private boolean isRequired(final long procId) { - return !isDeleted(procId) && !procedures.containsKey(procId); + return !isDeleted(procId) && !procedureMap.contains(procId); + } + + // ========================================================================== + // We keep an in-memory map of the procedures sorted by replay order. + // (see the details in the beginning of the file) + // _______________________________________________ + // procedureMap = | A | | E | | C | | | | | G | | | + // D B + // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G + // + // We also have a lazy grouping by "root procedure", and a list of + // unlinked procedure. If after reading all the WALs we have unlinked + // procedures it means that we had a missing WAL or a corruption. + // rootHead = A <-> D <-> G + // B E + // C + // unlinkFromLinkList = None + // ========================================================================== + private static class Entry { + // hash-table next + protected Entry hashNext; + // child head + protected Entry childHead; + // double-link for rootHead or childHead + protected Entry linkNext; + protected Entry linkPrev; + // replay double-linked-list + protected Entry replayNext; + protected Entry replayPrev; + // procedure-infos + protected Procedure procedure; + protected ProcedureProtos.Procedure proto; + protected boolean ready = false; + + public Entry(Entry hashNext) { this.hashNext = hashNext; } + + public long getProcId() { return proto.getProcId(); } + public long getParentId() { return proto.getParentId(); } + public boolean hasParent() { return proto.hasParentId(); } + public boolean isReady() { return ready; } + + public Procedure convert() throws IOException { + if (procedure == null) { + procedure = Procedure.convert(proto); + } + return procedure; + } + + @Override + public String toString() { + return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")"; + } + } + + private static class EntryIterator implements ProcedureIterator { + private final Entry replayHead; + private Entry current; + + public EntryIterator(Entry replayHead) { + this.replayHead = replayHead; + this.current = replayHead; + } + + @Override + public void reset() { + this.current = replayHead; + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public Procedure next() throws IOException { + try { + return current.convert(); + } finally { + current = current.replayNext; + } + } + } + + private static class WalProcedureMap { + // procedure hash table + private Entry[] procedureMap; + + // replay-order double-linked-list + private Entry replayOrderHead; + private Entry replayOrderTail; + + // root linked-list + private Entry rootHead; + + // pending unlinked children (root not present yet) + private Entry childUnlinkedHead; + + public WalProcedureMap(int size) { + procedureMap = new Entry[size]; + replayOrderHead = null; + replayOrderTail = null; + rootHead = null; + childUnlinkedHead = null; + } + + public void add(ProcedureProtos.Procedure procProto) { + Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); + boolean isNew = entry.proto == null; + entry.proto = procProto; + addToReplayList(entry); + + if (isNew) { + if (procProto.hasParentId()) { + childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); + } else { + rootHead = addToLinkList(entry, rootHead); + } + } + } + + public boolean remove(long procId) { + Entry entry = removeFromMap(procId); + if (entry != null) { + unlinkFromReplayList(entry); + unlinkFromLinkList(entry); + return true; + } + return false; + } + + public boolean contains(long procId) { + return getProcedure(procId) != null; + } + + public boolean isEmpty() { + return replayOrderHead == null; + } + + public void clear() { + for (int i = 0; i < procedureMap.length; ++i) { + procedureMap[i] = null; + } + replayOrderHead = null; + replayOrderTail = null; + rootHead = null; + childUnlinkedHead = null; + } + + /* + * Merges two WalProcedureMap, + * the target is the "global" map, the source is the "local" map. + * - The entries in the hashtables are guaranteed to be unique. + * On replay we don't load procedures that already exist in the "global" + * map (the one we are merging the "local" in to). + * - The replayOrderList of the "local" nao will be appended to the "global" + * map replay list. + * - The "local" map will be cleared at the end of the operation. + */ + public void mergeTail(WalProcedureMap other) { + for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { + int slotIndex = getMapSlot(p.getProcId()); + p.hashNext = procedureMap[slotIndex]; + procedureMap[slotIndex] = p; + } + + if (replayOrderHead == null) { + replayOrderHead = other.replayOrderHead; + replayOrderTail = other.replayOrderTail; + rootHead = other.rootHead; + childUnlinkedHead = other.childUnlinkedHead; + } else { + // append replay list + assert replayOrderTail.replayNext == null; + assert other.replayOrderHead.replayPrev == null; + replayOrderTail.replayNext = other.replayOrderHead; + other.replayOrderHead.replayPrev = replayOrderTail; + replayOrderTail = other.replayOrderTail; + + // merge rootHead + if (rootHead == null) { + rootHead = other.rootHead; + } else if (other.rootHead != null) { + Entry otherTail = findLinkListTail(other.rootHead); + otherTail.linkNext = rootHead; + rootHead.linkPrev = otherTail; + rootHead = other.rootHead; + } + + // merge childUnlinkedHead + if (childUnlinkedHead == null) { + childUnlinkedHead = other.childUnlinkedHead; + } else if (other.childUnlinkedHead != null) { + Entry otherTail = findLinkListTail(other.childUnlinkedHead); + otherTail.linkNext = childUnlinkedHead; + childUnlinkedHead.linkPrev = otherTail; + childUnlinkedHead = other.childUnlinkedHead; + } + } + + other.clear(); + } + + /* + * Returns an EntryIterator with the list of procedures ready + * to be added to the executor. + * A Procedure is ready if its children and parent are ready. + */ + public EntryIterator fetchReady() { + buildGraph(); + + Entry readyHead = null; + Entry readyTail = null; + Entry p = replayOrderHead; + while (p != null) { + Entry next = p.replayNext; + if (p.isReady()) { + unlinkFromReplayList(p); + if (readyTail != null) { + readyTail.replayNext = p; + p.replayPrev = readyTail; + } else { + p.replayPrev = null; + readyHead = p; + } + readyTail = p; + p.replayNext = null; + } + p = next; + } + // we need the hash-table lookups for parents, so this must be done + // out of the loop where we check isReadyToRun() + for (p = readyHead; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + unlinkFromLinkList(p); + } + return readyHead != null ? new EntryIterator(readyHead) : null; + } + + /* + * Drain this map and return all procedures in it. + */ + public EntryIterator fetchAll() { + Entry head = replayOrderHead; + for (Entry p = head; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + } + for (int i = 0; i < procedureMap.length; ++i) { + assert procedureMap[i] == null : "map not empty i=" + i; + } + replayOrderHead = null; + replayOrderTail = null; + childUnlinkedHead = null; + rootHead = null; + return head != null ? new EntryIterator(head) : null; + } + + private void buildGraph() { + Entry p = childUnlinkedHead; + while (p != null) { + Entry next = p.linkNext; + Entry rootProc = getRootProcedure(p); + if (rootProc != null) { + rootProc.childHead = addToLinkList(p, rootProc.childHead); + } + p = next; + } + + for (p = rootHead; p != null; p = p.linkNext) { + checkReadyToRun(p); + } + } + + private Entry getRootProcedure(Entry entry) { + while (entry != null && entry.hasParent()) { + entry = getProcedure(entry.getParentId()); + } + return entry; + } + + /* + * (see the comprehensive explaination in the beginning of the file) + * A Procedure is ready when parent and children are ready. + * "ready" means that we all the information that we need in-memory. + * + * Example-1: + * We have two WALs, we start reading fronm the newest (wal-2) + * wal-2 | C B | + * wal-1 | A B C | + * + * If C and B don't depend on A (A is not the parent), we can start them + * before reading wal-1. If B is the only one with parent A we can start C + * and read one more WAL before being able to start B. + * + * How do we know with the only information in B that we are not ready. + * - easy case, the parent is missing from the global map + * - more complex case we look at the Stack IDs + * + * The Stack-IDs are added to the procedure order as incremental index + * tracking how many times that procedure was executed, which is equivalent + * at the number of times we wrote the procedure to the WAL. + * In the example above: + * wal-2: B has stackId = [1, 2] + * wal-1: B has stackId = [1] + * wal-1: A has stackId = [0] + * + * Since we know that the Stack-IDs are incremental for a Procedure, + * we notice that there is a gap in the stackIds of B, so something was + * executed before. + * To identify when a Procedure is ready we do the sum of the stackIds of + * the procedure and the parent. if the stackIdSum is equals to the + * sum of {1..maxStackId} then everything we need is avaiable. + * + * Example-2 + * wal-2 | A | A stackIds = [0, 2] + * wal-1 | A B | B stackIds = [1] + * + * There is a gap between A stackIds so something was executed in between. + */ + private boolean checkReadyToRun(Entry rootEntry) { + int stackIdSum = 0; + int maxStackId = 0; + for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { + int stackId = 1 + rootEntry.proto.getStackId(i); + maxStackId = Math.max(maxStackId, stackId); + stackIdSum += stackId; + } + + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + for (int i = 0; i < p.proto.getStackIdCount(); ++i) { + int stackId = 1 + p.proto.getStackId(i); + maxStackId = Math.max(maxStackId, stackId); + stackIdSum += stackId; + } + } + final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); + if (cmpStackIdSum == stackIdSum) { + rootEntry.ready = true; + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + p.ready = true; + } + return true; + } + return false; + } + + private void unlinkFromReplayList(Entry entry) { + if (replayOrderHead == entry) { + replayOrderHead = entry.replayNext; + } + if (replayOrderTail == entry) { + replayOrderTail = entry.replayPrev; + } + if (entry.replayPrev != null) { + entry.replayPrev.replayNext = entry.replayNext; + } + if (entry.replayNext != null) { + entry.replayNext.replayPrev = entry.replayPrev; + } + } + + private void addToReplayList(final Entry entry) { + unlinkFromReplayList(entry); + entry.replayNext = replayOrderHead; + entry.replayPrev = null; + if (replayOrderHead != null) { + replayOrderHead.replayPrev = entry; + } else { + replayOrderTail = entry; + } + replayOrderHead = entry; + } + + private void unlinkFromLinkList(Entry entry) { + if (entry == rootHead) { + rootHead = entry.linkNext; + } else if (entry == childUnlinkedHead) { + childUnlinkedHead = entry.linkNext; + } + if (entry.linkPrev != null) { + entry.linkPrev.linkNext = entry.linkNext; + } + if (entry.linkNext != null) { + entry.linkNext.linkPrev = entry.linkPrev; + } + } + + private Entry addToLinkList(Entry entry, Entry linkHead) { + unlinkFromLinkList(entry); + entry.linkNext = linkHead; + entry.linkPrev = null; + if (linkHead != null) { + linkHead.linkPrev = entry; + } + return entry; + } + + private Entry findLinkListTail(Entry linkHead) { + Entry tail = linkHead; + while (tail.linkNext != null) { + tail = tail.linkNext; + } + return tail; + } + + private Entry addToMap(final long procId, final boolean hasParent) { + int slotIndex = getMapSlot(procId); + Entry entry = getProcedure(slotIndex, procId); + if (entry != null) return entry; + + entry = new Entry(procedureMap[slotIndex]); + procedureMap[slotIndex] = entry; + return entry; + } + + private Entry removeFromMap(final long procId) { + int slotIndex = getMapSlot(procId); + Entry prev = null; + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + if (prev != null) { + prev.hashNext = entry.hashNext; + } else { + procedureMap[slotIndex] = entry.hashNext; + } + entry.hashNext = null; + return entry; + } + prev = entry; + entry = entry.hashNext; + } + return null; + } + + private Entry getProcedure(final long procId) { + return getProcedure(getMapSlot(procId), procId); + } + + private Entry getProcedure(final int slotIndex, final long procId) { + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + return entry; + } + entry = entry.hashNext; + } + return null; + } + + private int getMapSlot(final long procId) { + return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length); + } } } \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 1884adcd606..f4a52b12dce 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -215,13 +215,12 @@ public class WALProcedureStore implements ProcedureStore { FileStatus[] oldLogs = getLogFiles(); while (running.get()) { // Get Log-MaxID and recover lease on old logs - flushLogId = initOldLogs(oldLogs) + 1; + flushLogId = initOldLogs(oldLogs); // Create new state-log - if (!rollWriter(flushLogId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else has already created log: " + flushLogId); - } + if (!rollWriter(flushLogId + 1)) { + // someone else has already created this log + LOG.debug("someone else has already created log " + flushLogId); continue; } @@ -241,7 +240,7 @@ public class WALProcedureStore implements ProcedureStore { } @Override - public Iterator load() throws IOException { + public void load(final ProcedureLoader loader) throws IOException { if (logs.isEmpty()) { throw new RuntimeException("recoverLease() must be called before loading data"); } @@ -251,7 +250,8 @@ public class WALProcedureStore implements ProcedureStore { if (LOG.isDebugEnabled()) { LOG.debug("No state logs to replay."); } - return null; + loader.setMaxProcId(0); + return; } // Load the old logs @@ -259,7 +259,22 @@ public class WALProcedureStore implements ProcedureStore { Iterator it = logs.descendingIterator(); it.next(); // Skip the current log try { - return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { + ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { + @Override + public void setMaxProcId(long maxProcId) { + loader.setMaxProcId(maxProcId); + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + loader.load(procIter); + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + loader.handleCorrupted(procIter); + } + @Override public void removeLog(ProcedureWALFile log) { toRemove.add(log); @@ -301,7 +316,7 @@ public class WALProcedureStore implements ProcedureStore { } // Push the transaction data and wait until it is persisted - logId = pushData(slot); + pushData(slot); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -383,7 +398,7 @@ public class WALProcedureStore implements ProcedureStore { storeTracker.delete(procId); if (logId == flushLogId) { if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) { - removeOldLogs = rollWriterOrDie(logId + 1); + removeOldLogs = rollWriterOrDie(); } } } @@ -541,9 +556,9 @@ public class WALProcedureStore implements ProcedureStore { } } - private boolean rollWriterOrDie(final long logId) { + private boolean rollWriterOrDie() { try { - return rollWriter(logId); + return rollWriter(); } catch (IOException e) { LOG.warn("Unable to roll the log", e); sendAbortProcessSignal(); @@ -551,7 +566,13 @@ public class WALProcedureStore implements ProcedureStore { } } + protected boolean rollWriter() throws IOException { + return rollWriter(flushLogId + 1); + } + private boolean rollWriter(final long logId) throws IOException { + assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; + ProcedureWALHeader header = ProcedureWALHeader.newBuilder() .setVersion(ProcedureWALFormat.HEADER_VERSION) .setType(ProcedureWALFormat.LOG_TYPE_STREAM) diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 7b9fc69ed1a..ddea9d2174b 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -57,11 +57,11 @@ public class ProcedureTestingUtility { public static void restart(ProcedureExecutor procExecutor) throws Exception { - restart(procExecutor, null); + restart(procExecutor, null, true); } public static void restart(ProcedureExecutor procExecutor, - Runnable beforeStartAction) throws Exception { + Runnable beforeStartAction, boolean failOnCorrupted) throws Exception { ProcedureStore procStore = procExecutor.getStore(); int storeThreads = procExecutor.getNumThreads(); int execThreads = procExecutor.getNumThreads(); @@ -75,7 +75,7 @@ public class ProcedureTestingUtility { } // re-start procStore.start(storeThreads); - procExecutor.start(execThreads); + procExecutor.start(execThreads, failOnCorrupted); } public static void setKillBeforeStoreUpdate(ProcedureExecutor procExecutor, diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java index 022f8ad51ee..0b2a3646504 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java @@ -70,7 +70,7 @@ public class TestProcedureExecution { procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index f21b6fa08e7..7735b63f435 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -74,7 +74,7 @@ public class TestProcedureRecovery { procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); procSleepInterval = 0; } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 8a7c1a1a05c..61c58e188c2 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -19,13 +19,17 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -45,7 +49,7 @@ import static org.junit.Assert.fail; public class TestProcedureReplayOrder { private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class); - private static final Procedure NULL_PROC = null; + private static final int NUM_THREADS = 16; private ProcedureExecutor procExecutor; private TestProcedureEnv procEnv; @@ -59,7 +63,7 @@ public class TestProcedureReplayOrder { @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); - htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10); + htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 25); testDir = htu.getDataTestDir(); fs = testDir.getFileSystem(htu.getConfiguration()); @@ -69,8 +73,8 @@ public class TestProcedureReplayOrder { procEnv = new TestProcedureEnv(); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); - procStore.start(24); - procExecutor.start(1); + procStore.start(NUM_THREADS); + procExecutor.start(1, true); } @After @@ -81,47 +85,45 @@ public class TestProcedureReplayOrder { } @Test(timeout=90000) - public void testSingleStepReplyOrder() throws Exception { - // avoid the procedure to be runnable - procEnv.setAcquireLock(false); + public void testSingleStepReplayOrder() throws Exception { + final int NUM_PROC_XTHREAD = 32; + final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD; // submit the procedures - submitProcedures(16, 25, TestSingleStepProcedure.class); + submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class); + + while (procEnv.getExecId() < NUM_PROCS) { + Thread.sleep(100); + } // restart the executor and allow the procedures to run - ProcedureTestingUtility.restart(procExecutor, new Runnable() { - @Override - public void run() { - procEnv.setAcquireLock(true); - } - }); + ProcedureTestingUtility.restart(procExecutor); // wait the execution of all the procedures and // assert that the execution order was sorted by procId ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); - procEnv.assertSortedExecList(); - - // TODO: FIXME: This should be revisited + procEnv.assertSortedExecList(NUM_PROCS); } - @Ignore @Test(timeout=90000) - public void testMultiStepReplyOrder() throws Exception { - // avoid the procedure to be runnable - procEnv.setAcquireLock(false); + public void testMultiStepReplayOrder() throws Exception { + final int NUM_PROC_XTHREAD = 24; + final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2); // submit the procedures - submitProcedures(16, 10, TestTwoStepProcedure.class); + submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class); + + while (procEnv.getExecId() < NUM_PROCS) { + Thread.sleep(100); + } // restart the executor and allow the procedures to run - ProcedureTestingUtility.restart(procExecutor, new Runnable() { - @Override - public void run() { - procEnv.setAcquireLock(true); - } - }); + ProcedureTestingUtility.restart(procExecutor); - fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER"); + // wait the execution of all the procedures and + // assert that the execution order was sorted by procId + ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); + procEnv.assertSortedExecList(NUM_PROCS); } private void submitProcedures(final int nthreads, final int nprocPerThread, @@ -153,73 +155,101 @@ public class TestProcedureReplayOrder { } private static class TestProcedureEnv { - private ArrayList execList = new ArrayList(); - private boolean acquireLock = true; + private ArrayList execList = new ArrayList(); + private AtomicLong execTimestamp = new AtomicLong(0); - public void setAcquireLock(boolean acquireLock) { - this.acquireLock = acquireLock; + public long getExecId() { + return execTimestamp.get(); } - public boolean canAcquireLock() { - return acquireLock; + public long nextExecId() { + return execTimestamp.incrementAndGet(); } - public void addToExecList(final Procedure proc) { - execList.add(proc.getProcId()); + public void addToExecList(final TestProcedure proc) { + execList.add(proc); } - public ArrayList getExecList() { - return execList; - } - - public void assertSortedExecList() { + public void assertSortedExecList(int numProcs) { + assertEquals(numProcs, execList.size()); LOG.debug("EXEC LIST: " + execList); - for (int i = 1; i < execList.size(); ++i) { - assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i), - execList.get(i-1) < execList.get(i)); + for (int i = 0; i < execList.size() - 1; ++i) { + TestProcedure a = execList.get(i); + TestProcedure b = execList.get(i + 1); + assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId()); } } } - public static class TestSingleStepProcedure extends SequentialProcedure { + public static abstract class TestProcedure extends Procedure { + protected long execId = 0; + protected int step = 0; + + public long getExecId() { + return execId; + } + + @Override + protected void rollback(TestProcedureEnv env) { } + + @Override + protected boolean abort(TestProcedureEnv env) { return true; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + StreamUtils.writeLong(stream, execId); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + execId = StreamUtils.readLong(stream); + step = 2; + } + } + + public static class TestSingleStepProcedure extends TestProcedure { public TestSingleStepProcedure() { } @Override - protected Procedure[] execute(TestProcedureEnv env) { - LOG.debug("execute procedure " + this); - env.addToExecList(this); - return null; - } - - protected boolean acquireLock(final TestProcedureEnv env) { - return env.canAcquireLock(); + protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { + LOG.trace("execute procedure step=" + step + ": " + this); + if (step == 0) { + step = 1; + execId = env.nextExecId(); + return new Procedure[] { this }; + } else if (step == 2) { + env.addToExecList(this); + return null; + } + throw new ProcedureYieldException(); } @Override - protected void rollback(TestProcedureEnv env) { } - - @Override - protected boolean abort(TestProcedureEnv env) { return true; } + public String toString() { + return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")"; + } } - public static class TestTwoStepProcedure extends SequentialProcedure { + public static class TestTwoStepProcedure extends TestProcedure { public TestTwoStepProcedure() { } @Override - protected Procedure[] execute(TestProcedureEnv env) { - LOG.debug("execute procedure " + this); - env.addToExecList(this); - return new Procedure[] { new TestSingleStepProcedure() }; - } - - protected boolean acquireLock(final TestProcedureEnv env) { - return true; + protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { + LOG.trace("execute procedure step=" + step + ": " + this); + if (step == 0) { + step = 1; + execId = env.nextExecId(); + return new Procedure[] { new TestSingleStepProcedure() }; + } else if (step == 2) { + env.addToExecList(this); + return null; + } + throw new ProcedureYieldException(); } @Override - protected void rollback(TestProcedureEnv env) { } - - @Override - protected boolean abort(TestProcedureEnv env) { return true; } + public String toString() { + return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")"; + } } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 1829d4b96d0..19a9ea41b32 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -22,6 +22,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.HashSet; import java.util.Set; @@ -35,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.SequentialProcedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IOUtils; @@ -83,17 +89,20 @@ public class TestWALProcedureStore { fs.delete(logDir, true); } - private Iterator storeRestart() throws Exception { + private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { procStore.stop(false); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); - return procStore.load(); + procStore.load(loader); } @Test public void testEmptyLogLoad() throws Exception { - Iterator loader = storeRestart(); - assertEquals(0, countProcedures(loader)); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(0, loader.getMaxProcId()); + assertEquals(0, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); } @Test @@ -152,8 +161,10 @@ public class TestWALProcedureStore { assertEquals(1, logs.length); corruptLog(logs[0], 4); - int count = countProcedures(storeRestart()); - assertEquals(100, count); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(100, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); } @Test @@ -172,10 +183,205 @@ public class TestWALProcedureStore { assertEquals(1, logs.length); corruptLog(logs[0], 1823); - int count = countProcedures(storeRestart()); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); assertTrue(procStore.getCorruptedLogs() != null); assertEquals(1, procStore.getCorruptedLogs().size()); - assertEquals(85, count); + assertEquals(85, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + } + + @Test + public void testCorruptedProcedures() throws Exception { + // Insert root-procedures + TestProcedure[] rootProcs = new TestProcedure[10]; + for (int i = 1; i <= rootProcs.length; i++) { + rootProcs[i-1] = new TestProcedure(i, 0); + procStore.insert(rootProcs[i-1], null); + rootProcs[i-1].addStackId(0); + procStore.update(rootProcs[i-1]); + } + // insert root-child txn + procStore.rollWriter(); + for (int i = 1; i <= rootProcs.length; i++) { + TestProcedure b = new TestProcedure(rootProcs.length + i, i); + rootProcs[i-1].addStackId(1); + procStore.insert(rootProcs[i-1], new Procedure[] { b }); + } + // insert child updates + procStore.rollWriter(); + for (int i = 1; i <= rootProcs.length; i++) { + procStore.update(new TestProcedure(rootProcs.length + i, i)); + } + + // Stop the store + procStore.stop(false); + + // Remove 4 byte from the trailer + FileStatus[] logs = fs.listStatus(logDir); + assertEquals(3, logs.length); + Arrays.sort(logs, new Comparator() { + @Override + public int compare(FileStatus o1, FileStatus o2) { + return o1.getPath().getName().compareTo(o2.getPath().getName()); + } + }); + + // Remove the first log, we have insert-txn and updates in the others so everything is fine. + fs.delete(logs[0].getPath(), false); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(rootProcs.length * 2, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + + // Remove the second log, we have lost any root/parent references + fs.delete(logs[1].getPath(), false); + loader.reset(); + storeRestart(loader); + assertEquals(0, loader.getLoadedCount()); + assertEquals(rootProcs.length, loader.getCorruptedCount()); + for (Procedure proc: loader.getCorrupted()) { + assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); + assertTrue(proc.toString(), + proc.getProcId() > rootProcs.length && + proc.getProcId() <= (rootProcs.length * 2)); + } + } + + @Test(timeout=60000) + public void testWalReplayOrder_AB_A() throws Exception { + /* + * | A B | -> | A | + */ + TestProcedure a = new TestProcedure(1, 0); + TestProcedure b = new TestProcedure(2, 1); + + procStore.insert(a, null); + a.addStackId(0); + procStore.update(a); + + procStore.insert(a, new Procedure[] { b }); + b.addStackId(1); + procStore.update(b); + + procStore.rollWriter(); + + a.addStackId(2); + procStore.update(a); + + storeRestart(new ProcedureStore.ProcedureLoader() { + @Override + public void setMaxProcId(long maxProcId) { + assertEquals(2, maxProcId); + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + assertTrue(procIter.hasNext()); + assertEquals(1, procIter.next().getProcId()); + assertTrue(procIter.hasNext()); + assertEquals(2, procIter.next().getProcId()); + assertFalse(procIter.hasNext()); + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + assertFalse(procIter.hasNext()); + } + }); + } + + @Test(timeout=60000) + public void testWalReplayOrder_ABC_BAD() throws Exception { + /* + * | A B C | -> | B A D | + */ + TestProcedure a = new TestProcedure(1, 0); + TestProcedure b = new TestProcedure(2, 1); + TestProcedure c = new TestProcedure(3, 2); + TestProcedure d = new TestProcedure(4, 0); + + procStore.insert(a, null); + a.addStackId(0); + procStore.update(a); + + procStore.insert(a, new Procedure[] { b }); + b.addStackId(1); + procStore.update(b); + + procStore.insert(b, new Procedure[] { c }); + b.addStackId(2); + procStore.update(b); + + procStore.rollWriter(); + + b.addStackId(3); + procStore.update(b); + + a.addStackId(4); + procStore.update(a); + + procStore.insert(d, null); + d.addStackId(0); + procStore.update(d); + + storeRestart(new ProcedureStore.ProcedureLoader() { + @Override + public void setMaxProcId(long maxProcId) { + assertEquals(4, maxProcId); + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + assertTrue(procIter.hasNext()); + assertEquals(4, procIter.next().getProcId()); + // TODO: This will be multiple call once we do fast-start + //assertFalse(procIter.hasNext()); + + assertTrue(procIter.hasNext()); + assertEquals(1, procIter.next().getProcId()); + assertTrue(procIter.hasNext()); + assertEquals(2, procIter.next().getProcId()); + assertTrue(procIter.hasNext()); + assertEquals(3, procIter.next().getProcId()); + assertFalse(procIter.hasNext()); + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + assertFalse(procIter.hasNext()); + } + }); + } + + public static class TestProcedure extends Procedure { + public TestProcedure() {} + + public TestProcedure(long procId, long parentId) { + setProcId(procId); + if (parentId > 0) { + setParentProcId(parentId); + } + } + + public void addStackId(final int index) { + addStackIndex(index); + } + + @Override + protected Procedure[] execute(Void env) { return null; } + + @Override + protected void rollback(Void env) { } + + @Override + protected boolean abort(Void env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { } } private void corruptLog(final FileStatus logFile, final long dropBytes) @@ -191,29 +397,11 @@ public class TestWALProcedureStore { } private void verifyProcIdsOnRestart(final Set procIds) throws Exception { - int count = 0; - Iterator loader = storeRestart(); - while (loader.hasNext()) { - Procedure proc = loader.next(); - LOG.debug("loading procId=" + proc.getProcId()); - assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId())); - count++; - } - assertEquals(procIds.size(), count); - } - - private void assertIsEmpty(Iterator iterator) { - assertEquals(0, countProcedures(iterator)); - } - - private int countProcedures(Iterator iterator) { - int count = 0; - while (iterator.hasNext()) { - Procedure proc = iterator.next(); - LOG.trace("loading procId=" + proc.getProcId()); - count++; - } - return count; + LOG.debug("expected: " + procIds); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(procIds.size(), loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); } private void assertEmptyLogDir() { @@ -263,4 +451,78 @@ public class TestWALProcedureStore { } } } + + private class LoadCounter implements ProcedureStore.ProcedureLoader { + private final ArrayList corrupted = new ArrayList(); + private final ArrayList loaded = new ArrayList(); + + private Set procIds; + private long maxProcId = 0; + + public LoadCounter() { + this(null); + } + + public LoadCounter(final Set procIds) { + this.procIds = procIds; + } + + public void reset() { + reset(null); + } + + public void reset(final Set procIds) { + corrupted.clear(); + loaded.clear(); + this.procIds = procIds; + this.maxProcId = 0; + } + + public long getMaxProcId() { + return maxProcId; + } + + public ArrayList getLoaded() { + return loaded; + } + + public int getLoadedCount() { + return loaded.size(); + } + + public ArrayList getCorrupted() { + return corrupted; + } + + public int getCorruptedCount() { + return corrupted.size(); + } + + @Override + public void setMaxProcId(long maxProcId) { + maxProcId = maxProcId; + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + LOG.debug("loading procId=" + proc.getProcId() + ": " + proc); + if (procIds != null) { + assertTrue("procId=" + proc.getProcId() + " unexpected", + procIds.contains(proc.getProcId())); + } + loaded.add(proc); + } + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); + corrupted.add(proc); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2b180ad486a..7c99abb8d3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1131,8 +1131,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server { final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(Runtime.getRuntime().availableProcessors(), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); + final boolean abortOnCorruption = conf.getBoolean( + MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, + MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); procedureStore.start(numThreads); - procedureExecutor.start(numThreads); + procedureExecutor.start(numThreads, abortOnCorruption); } private void stopProcedureExecutor() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java index 90ed4eeb769..c21137d949e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -24,8 +24,21 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public final class MasterProcedureConstants { private MasterProcedureConstants() {} + /** Used to construct the name of the log directory for master procedures */ public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; + /** Number of threads used by the procedure executor */ public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4; + + /** + * Procedure replay sanity check. In case a WAL is missing or unreadable we + * may lose information about pending/running procedures. + * Set this to true in case you want the Master failing on load if a corrupted + * procedure is encountred. + * (Default is off, because we prefer having the Master up and running and + * fix the "in transition" state "by hand") + */ + public static final String EXECUTOR_ABORT_ON_CORRUPTION = "hbase.procedure.abort.on.corruption"; + public static final boolean DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION = false; } diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml index 8c8312cb108..34a1b20474b 100644 --- a/hbase-server/src/test/resources/hbase-site.xml +++ b/hbase-server/src/test/resources/hbase-site.xml @@ -147,4 +147,11 @@ Skip sanity checks in tests + + hbase.procedure.fail.on.corruption + true + + Enable replay sanity checks on procedure tests. + +