diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 0e2d9b8e5a9..91a305b1012 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -505,8 +505,10 @@ public class ProcedureExecutor { private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) throws IOException { // 1. Build the rollback stack - int runnablesCount = 0; + int runnableCount = 0; int failedCount = 0; + int waitingCount = 0; + int waitingTimeoutCount = 0; while (procIter.hasNext()) { boolean finished = procIter.isNextFinished(); @SuppressWarnings("unchecked") @@ -526,11 +528,21 @@ public class ProcedureExecutor { // add the procedure to the map proc.beforeReplay(getEnvironment()); procedures.put(proc.getProcId(), proc); - - if (proc.getState() == ProcedureState.RUNNABLE) { - runnablesCount++; - } else if (proc.getState() == ProcedureState.FAILED) { - failedCount++; + switch (proc.getState()) { + case RUNNABLE: + runnableCount++; + break; + case FAILED: + failedCount++; + break; + case WAITING: + waitingCount++; + break; + case WAITING_TIMEOUT: + waitingTimeoutCount++; + break; + default: + break; } } @@ -551,9 +563,10 @@ public class ProcedureExecutor { // have been polled out already, so when loading we can not add the procedure to scheduler first // and then call acquireLock, since the procedure is still in the queue, and since we will // remove the queue from runQueue, then no one can poll it out, then there is a dead lock - List> runnableList = new ArrayList<>(runnablesCount); + List> runnableList = new ArrayList<>(runnableCount); List> failedList = new ArrayList<>(failedCount); - Set> waitingSet = null; + List> waitingList = new ArrayList<>(waitingCount); + List> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); procIter.reset(); while (procIter.hasNext()) { if (procIter.isNextFinished()) { @@ -591,26 +604,10 @@ public class ProcedureExecutor { runnableList.add(proc); break; case WAITING: - if (!proc.hasChildren()) { - // Normally, WAITING procedures should be waken by its children. - // But, there is a case that, all the children are successful and before - // they can wake up their parent procedure, the master was killed. - // So, during recovering the procedures from ProcedureWal, its children - // are not loaded because of their SUCCESS state. - // So we need to continue to run this WAITING procedure. But before - // executing, we need to set its state to RUNNABLE, otherwise, a exception - // will throw: - // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, - // "NOT RUNNABLE! " + procedure.toString()); - proc.setState(ProcedureState.RUNNABLE); - runnableList.add(proc); - } + waitingList.add(proc); break; case WAITING_TIMEOUT: - if (waitingSet == null) { - waitingSet = new HashSet<>(); - } - waitingSet.add(proc); + waitingTimeoutList.add(proc); break; case FAILED: failedList.add(proc); @@ -625,39 +622,32 @@ public class ProcedureExecutor { } } - // 3. Validate the stacks - int corruptedCount = 0; - Iterator>> itStack = - rollbackStack.entrySet().iterator(); - while (itStack.hasNext()) { - Map.Entry> entry = itStack.next(); - RootProcedureState procStack = entry.getValue(); - if (procStack.isValid()) continue; - - for (Procedure proc : procStack.getSubproceduresStack()) { - LOG.error("Corrupted " + proc); - procedures.remove(proc.getProcId()); - runnableList.remove(proc); - if (waitingSet != null) waitingSet.remove(proc); - corruptedCount++; + // 4. Check the waiting procedures to see if some of them can be added to runnable. + waitingList.forEach(proc -> { + if (!proc.hasChildren()) { + // Normally, WAITING procedures should be waken by its children. + // But, there is a case that, all the children are successful and before + // they can wake up their parent procedure, the master was killed. + // So, during recovering the procedures from ProcedureWal, its children + // are not loaded because of their SUCCESS state. + // So we need to continue to run this WAITING procedure. But before + // executing, we need to set its state to RUNNABLE, otherwise, a exception + // will throw: + // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, + // "NOT RUNNABLE! " + procedure.toString()); + proc.setState(ProcedureState.RUNNABLE); + runnableList.add(proc); } - itStack.remove(); - } + }); - if (abortOnCorruption && corruptedCount > 0) { - throw new IOException("found " + corruptedCount + " procedures on replay"); - } - - // 4. Push the procedures to the timeout executor - if (waitingSet != null && !waitingSet.isEmpty()) { - for (Procedure proc: waitingSet) { - proc.afterReplay(getEnvironment()); - timeoutExecutor.add(proc); - } - } - // 5. restore locks + // 5. Push the procedures to the timeout executor + waitingTimeoutList.forEach(proc -> { + proc.afterReplay(getEnvironment()); + timeoutExecutor.add(proc); + }); + // 6. restore locks restoreLocks(); - // 6. Push the procedure to the scheduler + // 7. Push the procedure to the scheduler failedList.forEach(scheduler::addBack); runnableList.forEach(p -> { p.afterReplay(getEnvironment()); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 0599acfcc85..d737a7a6c3d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -85,12 +85,18 @@ public interface ProcedureStore { boolean hasNext(); /** + * Calling this method does not need to converting the protobuf message to the Procedure class, + * so if it returns true we can call {@link #skipNext()} to skip the procedure without + * deserializing. This could increase the performance. * @return true if the iterator next element is a completed procedure. */ boolean isNextFinished(); /** * Skip the next procedure + *

+ * This method is used to skip the deserializing of the procedure to increase performance, as + * when calling next we need to convert the protobuf message to the Procedure class. */ void skipNext(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 1ac8e01f3ab..2e1e06ce054 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -31,70 +30,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; /** - * Helper class that loads the procedures stored in a WAL + * Helper class that loads the procedures stored in a WAL. */ @InterfaceAudience.Private public class ProcedureWALFormatReader { private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class); - // ============================================================================================== - // We read the WALs in reverse order from the newest to the oldest. - // We have different entry types: - // - INIT: Procedure submitted by the user (also known as 'root procedure') - // - INSERT: Children added to the procedure :[, ...] - // - UPDATE: The specified procedure was updated - // - DELETE: The procedure was removed (finished/rolledback and result TTL expired) - // - // In the WAL we can find multiple times the same procedure as UPDATE or INSERT. - // We read the WAL from top to bottom, so every time we find an entry of the - // same procedure, that will be the "latest" update (Caveat: with multiple threads writing - // the store, this assumption does not hold). - // - // We keep two in-memory maps: - // - localProcedureMap: is the map containing the entries in the WAL we are processing - // - procedureMap: is the map containing all the procedures we found up to the WAL in process. - // localProcedureMap is merged with the procedureMap once we reach the WAL EOF. - // - // Since we are reading the WALs in reverse order (newest to oldest), - // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it. - // - // The WAL is append-only so the last procedure in the WAL is the one that - // was in execution at the time we crashed/closed the server. - // Given that, the procedure replay order can be inferred by the WAL order. - // - // Example: - // WAL-2: [A, B, A, C, D] - // WAL-1: [F, G, A, F, B] - // Replay-Order: [D, C, A, B, F, G] - // - // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the - // record to the map that record is moved to the head of the "replayOrder" list. - // Using the example above: - // WAL-2 localProcedureMap.replayOrder is [D, C, A, B] - // WAL-1 localProcedureMap.replayOrder is [F, G] - // - // Each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap' - // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order. - // - // Fast Start: INIT/INSERT record and StackIDs - // --------------------------------------------- - // We have two special records, INIT and INSERT, that track the first time - // the procedure was added to the WAL. We can use this information to be able - // to start procedures before reaching the end of the WAL, or before reading all WALs. - // But in some cases, the WAL with that record can be already gone. - // As an alternative, we can use the stackIds on each procedure, - // to identify when a procedure is ready to start. - // If there are gaps in the sum of the stackIds we need to read more WALs. - // - // Example (all procs child of A): - // WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5] - // WAL-1: [A, B, C, D] - // - // In the case above we need to read one more WAL to be able to consider - // the root procedure A and all children as ready. - // ============================================================================================== - private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024); - private final WALProcedureMap procedureMap = new WALProcedureMap(1024); + /** + * We will use the localProcedureMap to track the active procedures for the current proc wal file, + * and when we finished reading one proc wal file, we will merge he localProcedureMap to the + * procedureMap, which tracks the global active procedures. + *

+ * See the comments of {@link WALProcedureMap} for more details. + *

+ * After reading all the proc wal files, we will use the procedures in the procedureMap to build a + * {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of + * {@link WALProcedureTree} and the code in {@link #finish()} for more details. + */ + private final WALProcedureMap localProcedureMap = new WALProcedureMap(); + private final WALProcedureMap procedureMap = new WALProcedureMap(); private final ProcedureWALFormat.Loader loader; @@ -178,7 +132,7 @@ public class ProcedureWALFormatReader { localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), localProcedureMap.getMaxModifiedProcId()); } - procedureMap.mergeTail(localProcedureMap); + procedureMap.merge(localProcedureMap); } if (localTracker.isPartial()) { localTracker.setPartialFlag(false); @@ -189,18 +143,11 @@ public class ProcedureWALFormatReader { // notify the loader about the max proc ID loader.setMaxProcId(maxProcId); - // fetch the procedure ready to run. - ProcedureIterator procIter = procedureMap.fetchReady(); - if (procIter != null) { - loader.load(procIter); - } - - // remaining procedures have missing link or dependencies - // consider them as corrupted, manual fix is probably required. - procIter = procedureMap.fetchAll(); - if (procIter != null) { - loader.handleCorrupted(procIter); - } + // build the procedure execution tree. When building we will verify that whether a procedure is + // valid. + WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures()); + loader.load(tree.getValidProcs()); + loader.handleCorrupted(tree.getCorruptedProcs()); } private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java index 18d7823cba6..9cda1bcc555 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java @@ -17,193 +17,50 @@ */ package org.apache.hadoop.hbase.procedure2.store.wal; -import java.io.IOException; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** - * We keep an in-memory map of the procedures sorted by replay order. (see the details in the - * beginning of {@link ProcedureWALFormatReader}). - * - *

- *      procedureMap = | A |   | E |   | C |   |   |   |   | G |   |   |
- *                       D               B
- *      replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
- *
- *  We also have a lazy grouping by "root procedure", and a list of
- *  unlinked procedures. If after reading all the WALs we have unlinked
- *  procedures it means that we had a missing WAL or a corruption.
- *      rootHead = A <-> D <-> G
- *                 B     E
- *                 C
- *      unlinkFromLinkList = None
- * 
+ * This class is used to track the active procedures when loading procedures from proc wal file. + *

+ * We will read proc wal files from new to old, but when reading a proc wal file, we will still read + * from top to bottom, so there are two groups of methods for this class. + *

+ * The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used + * when reading a proc wal file. In these methods, for the same procedure, typically the one comes + * later should win, please see the comment for + * {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the + * exceptions. + *

+ * The second group is {@link #merge(WALProcedureMap)}. We will have a global + * {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap} + * to hold the active procedures for the current proc wal file. And when we finish reading a proc + * wal file, we will merge the local one into the global one, by calling the + * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this + * method, for the same procedure, the one comes earlier will win, as we read the proc wal files + * from new to old(the reverse order). */ +@InterfaceAudience.Private class WALProcedureMap { private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class); - private static class Entry { - // For bucketed linked lists in hash-table. - private Entry hashNext; - // child head - private Entry childHead; - // double-link for rootHead or childHead - private Entry linkNext; - private Entry linkPrev; - // replay double-linked-list - private Entry replayNext; - private Entry replayPrev; - // procedure-infos - private Procedure procedure; - private ProcedureProtos.Procedure proto; - private boolean ready = false; + private final Map procMap = new HashMap<>(); - public Entry(Entry hashNext) { - this.hashNext = hashNext; - } - - public long getProcId() { - return proto.getProcId(); - } - - public long getParentId() { - return proto.getParentId(); - } - - public boolean hasParent() { - return proto.hasParentId(); - } - - public boolean isReady() { - return ready; - } - - public boolean isFinished() { - if (!hasParent()) { - // we only consider 'root' procedures. because for the user 'finished' - // means when everything up to the 'root' is finished. - switch (proto.getState()) { - case ROLLEDBACK: - case SUCCESS: - return true; - default: - break; - } - } - return false; - } - - public Procedure convert() throws IOException { - if (procedure == null) { - procedure = ProcedureUtil.convertToProcedure(proto); - } - return procedure; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Entry("); - sb.append(getProcId()); - sb.append(", parentId="); - sb.append(getParentId()); - sb.append(", class="); - sb.append(proto.getClassName()); - sb.append(")"); - return sb.toString(); - } - } - - private static class EntryIterator implements ProcedureIterator { - private final Entry replayHead; - private Entry current; - - public EntryIterator(Entry replayHead) { - this.replayHead = replayHead; - this.current = replayHead; - } - - @Override - public void reset() { - this.current = replayHead; - } - - @Override - public boolean hasNext() { - return current != null; - } - - @Override - public boolean isNextFinished() { - return current != null && current.isFinished(); - } - - @Override - public void skipNext() { - current = current.replayNext; - } - - @Override - public Procedure next() throws IOException { - try { - return current.convert(); - } finally { - current = current.replayNext; - } - } - } - - // procedure hash table - private Entry[] procedureMap; - - // replay-order double-linked-list - private Entry replayOrderHead; - private Entry replayOrderTail; - - // root linked-list - private Entry rootHead; - - // pending unlinked children (root not present yet) - private Entry childUnlinkedHead; - - // Track ProcId range private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; - public WALProcedureMap(int size) { - procedureMap = new Entry[size]; - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - } - - public void add(ProcedureProtos.Procedure procProto) { - trackProcIds(procProto.getProcId()); - Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); - boolean newEntry = entry.proto == null; - // We have seen procedure WALs where the entries are out of order; see HBASE-18152. - // To compensate, only replace the Entry procedure if for sure this new procedure - // is indeed an entry that came later. - // TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs - // contain procedure changes goingfrom start to finish in sequence. - if (newEntry || isIncreasing(entry.proto, procProto)) { - entry.proto = procProto; - } - addToReplayList(entry); - if (newEntry) { - if (procProto.hasParentId()) { - childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); - } else { - rootHead = addToLinkList(entry, rootHead); - } - } + private void trackProcId(long procId) { + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); } /** @@ -225,20 +82,44 @@ class WALProcedureMap { return increasing; } - public boolean remove(long procId) { - trackProcIds(procId); - Entry entry = removeFromMap(procId); - if (entry != null) { - unlinkFromReplayList(entry); - unlinkFromLinkList(entry); - return true; - } - return false; + public void add(ProcedureProtos.Procedure proc) { + procMap.compute(proc.getProcId(), (procId, existingProc) -> { + if (existingProc == null || isIncreasing(existingProc, proc)) { + return proc; + } else { + return existingProc; + } + }); + trackProcId(proc.getProcId()); } - private void trackProcIds(long procId) { - minModifiedProcId = Math.min(minModifiedProcId, procId); - maxModifiedProcId = Math.max(maxModifiedProcId, procId); + public void remove(long procId) { + procMap.remove(procId); + } + + public boolean isEmpty() { + return procMap.isEmpty(); + } + + public boolean contains(long procId) { + return procMap.containsKey(procId); + } + + /** + * Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in + * will be cleared after merging. + */ + public void merge(WALProcedureMap other) { + other.procMap.forEach(procMap::putIfAbsent); + maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); + minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); + other.procMap.clear(); + other.maxModifiedProcId = Long.MIN_VALUE; + other.minModifiedProcId = Long.MAX_VALUE; + } + + public Collection getProcedures() { + return Collections.unmodifiableCollection(procMap.values()); } public long getMinModifiedProcId() { @@ -248,360 +129,4 @@ class WALProcedureMap { public long getMaxModifiedProcId() { return maxModifiedProcId; } - - public boolean contains(long procId) { - return getProcedure(procId) != null; - } - - public boolean isEmpty() { - return replayOrderHead == null; - } - - public void clear() { - for (int i = 0; i < procedureMap.length; ++i) { - procedureMap[i] = null; - } - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - minModifiedProcId = Long.MAX_VALUE; - maxModifiedProcId = Long.MIN_VALUE; - } - - /* - * Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. - - * The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures - * that already exist in the "global" map (the one we are merging the "local" in to). - The - * replayOrderList of the "local" nao will be appended to the "global" map replay list. - The - * "local" map will be cleared at the end of the operation. - */ - public void mergeTail(WALProcedureMap other) { - for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { - int slotIndex = getMapSlot(p.getProcId()); - p.hashNext = procedureMap[slotIndex]; - procedureMap[slotIndex] = p; - } - - if (replayOrderHead == null) { - replayOrderHead = other.replayOrderHead; - replayOrderTail = other.replayOrderTail; - rootHead = other.rootHead; - childUnlinkedHead = other.childUnlinkedHead; - } else { - // append replay list - assert replayOrderTail.replayNext == null; - assert other.replayOrderHead.replayPrev == null; - replayOrderTail.replayNext = other.replayOrderHead; - other.replayOrderHead.replayPrev = replayOrderTail; - replayOrderTail = other.replayOrderTail; - - // merge rootHead - if (rootHead == null) { - rootHead = other.rootHead; - } else if (other.rootHead != null) { - Entry otherTail = findLinkListTail(other.rootHead); - otherTail.linkNext = rootHead; - rootHead.linkPrev = otherTail; - rootHead = other.rootHead; - } - - // merge childUnlinkedHead - if (childUnlinkedHead == null) { - childUnlinkedHead = other.childUnlinkedHead; - } else if (other.childUnlinkedHead != null) { - Entry otherTail = findLinkListTail(other.childUnlinkedHead); - otherTail.linkNext = childUnlinkedHead; - childUnlinkedHead.linkPrev = otherTail; - childUnlinkedHead = other.childUnlinkedHead; - } - } - maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); - minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); - - other.clear(); - } - - /** - * Returns an EntryIterator with the list of procedures ready to be added to the executor. A - * Procedure is ready if its children and parent are ready. - */ - public ProcedureIterator fetchReady() { - buildGraph(); - - Entry readyHead = null; - Entry readyTail = null; - Entry p = replayOrderHead; - while (p != null) { - Entry next = p.replayNext; - if (p.isReady()) { - unlinkFromReplayList(p); - if (readyTail != null) { - readyTail.replayNext = p; - p.replayPrev = readyTail; - } else { - p.replayPrev = null; - readyHead = p; - } - readyTail = p; - p.replayNext = null; - } - p = next; - } - // we need the hash-table lookups for parents, so this must be done - // out of the loop where we check isReadyToRun() - for (p = readyHead; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - unlinkFromLinkList(p); - } - return readyHead != null ? new EntryIterator(readyHead) : null; - } - - /** - * Drain this map and return all procedures in it. - */ - public ProcedureIterator fetchAll() { - Entry head = replayOrderHead; - for (Entry p = head; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - } - for (int i = 0; i < procedureMap.length; ++i) { - assert procedureMap[i] == null : "map not empty i=" + i; - } - replayOrderHead = null; - replayOrderTail = null; - childUnlinkedHead = null; - rootHead = null; - return head != null ? new EntryIterator(head) : null; - } - - private void buildGraph() { - Entry p = childUnlinkedHead; - while (p != null) { - Entry next = p.linkNext; - Entry rootProc = getRootProcedure(p); - if (rootProc != null) { - rootProc.childHead = addToLinkList(p, rootProc.childHead); - } - p = next; - } - - for (p = rootHead; p != null; p = p.linkNext) { - checkReadyToRun(p); - } - } - - private Entry getRootProcedure(Entry entry) { - while (entry != null && entry.hasParent()) { - entry = getProcedure(entry.getParentId()); - } - return entry; - } - - /** - * (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A - * Procedure is ready when parent and children are ready. "ready" means that we all the - * information that we need in-memory. - *

- * Example-1:
- * We have two WALs, we start reading from the newest (wal-2) - * - *

-   *    wal-2 | C B |
-   *    wal-1 | A B C |
-   * 
- * - * If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If - * B is the only one with parent A we can start C. We have to read one more WAL before being able - * to start B. - *

- * How do we know with the only information in B that we are not ready. - *

    - *
  • easy case, the parent is missing from the global map
  • - *
  • more complex case we look at the Stack IDs.
  • - *
- * The Stack-IDs are added to the procedure order as an incremental index tracking how many times - * that procedure was executed, which is equivalent to the number of times we wrote the procedure - * to the WAL.
- * In the example above: - * - *
-   *   wal-2: B has stackId = [1, 2]
-   *   wal-1: B has stackId = [1]
-   *   wal-1: A has stackId = [0]
-   * 
- * - * Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap - * in the stackIds of B, so something was executed before. - *

- * To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the - * parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is - * available. - *

- * Example-2 - * - *

-   *    wal-2 | A |              A stackIds = [0, 2]
-   *    wal-1 | A B |            B stackIds = [1]
-   * 
- * - * There is a gap between A stackIds so something was executed in between. - */ - private boolean checkReadyToRun(Entry rootEntry) { - assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; - - if (rootEntry.isFinished()) { - // If the root procedure is finished, sub-procedures should be gone - if (rootEntry.childHead != null) { - LOG.error("unexpected active children for root-procedure: {}", rootEntry); - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - LOG.error("unexpected active children: {}", p); - } - } - - assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry; - rootEntry.ready = true; - return true; - } - - int stackIdSum = 0; - int maxStackId = 0; - for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { - int stackId = 1 + rootEntry.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, - rootEntry); - } - - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - for (int i = 0; i < p.proto.getStackIdCount(); ++i) { - int stackId = 1 + p.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p); - } - } - // The cmpStackIdSum is this formula for finding the sum of a series of numbers: - // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg - final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); - if (cmpStackIdSum == stackIdSum) { - rootEntry.ready = true; - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - p.ready = true; - } - return true; - } - return false; - } - - private void unlinkFromReplayList(Entry entry) { - if (replayOrderHead == entry) { - replayOrderHead = entry.replayNext; - } - if (replayOrderTail == entry) { - replayOrderTail = entry.replayPrev; - } - if (entry.replayPrev != null) { - entry.replayPrev.replayNext = entry.replayNext; - } - if (entry.replayNext != null) { - entry.replayNext.replayPrev = entry.replayPrev; - } - } - - private void addToReplayList(final Entry entry) { - unlinkFromReplayList(entry); - entry.replayNext = replayOrderHead; - entry.replayPrev = null; - if (replayOrderHead != null) { - replayOrderHead.replayPrev = entry; - } else { - replayOrderTail = entry; - } - replayOrderHead = entry; - } - - private void unlinkFromLinkList(Entry entry) { - if (entry == rootHead) { - rootHead = entry.linkNext; - } else if (entry == childUnlinkedHead) { - childUnlinkedHead = entry.linkNext; - } - if (entry.linkPrev != null) { - entry.linkPrev.linkNext = entry.linkNext; - } - if (entry.linkNext != null) { - entry.linkNext.linkPrev = entry.linkPrev; - } - } - - private Entry addToLinkList(Entry entry, Entry linkHead) { - unlinkFromLinkList(entry); - entry.linkNext = linkHead; - entry.linkPrev = null; - if (linkHead != null) { - linkHead.linkPrev = entry; - } - return entry; - } - - private Entry findLinkListTail(Entry linkHead) { - Entry tail = linkHead; - while (tail.linkNext != null) { - tail = tail.linkNext; - } - return tail; - } - - private Entry addToMap(long procId, boolean hasParent) { - int slotIndex = getMapSlot(procId); - Entry entry = getProcedure(slotIndex, procId); - if (entry != null) { - return entry; - } - - entry = new Entry(procedureMap[slotIndex]); - procedureMap[slotIndex] = entry; - return entry; - } - - private Entry removeFromMap(final long procId) { - int slotIndex = getMapSlot(procId); - Entry prev = null; - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - if (prev != null) { - prev.hashNext = entry.hashNext; - } else { - procedureMap[slotIndex] = entry.hashNext; - } - entry.hashNext = null; - return entry; - } - prev = entry; - entry = entry.hashNext; - } - return null; - } - - private Entry getProcedure(long procId) { - return getProcedure(getMapSlot(procId), procId); - } - - private Entry getProcedure(int slotIndex, long procId) { - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - return entry; - } - entry = entry.hashNext; - } - return null; - } - - private int getMapSlot(long procId) { - return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length); - } -} \ No newline at end of file +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java new file mode 100644 index 00000000000..c32bd7f6d8c --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * Used to build the tree for procedures. + *

+ * We will group the procedures with the root procedure, and then validate each group. For each + * group of procedures(with the same root procedure), we will collect all the stack ids, if the max + * stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If + * not, we will consider all the procedures in this group as corrupted. Please see the code in + * {@link #checkReady(Entry, Map)} method. + *

+ * For the procedures not in any group, i.e, can not find the root procedure for these procedures, + * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method. + */ +@InterfaceAudience.Private +public final class WALProcedureTree { + + private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class); + + private static final class Entry { + + private final ProcedureProtos.Procedure proc; + + private final List subProcs = new ArrayList<>(); + + public Entry(ProcedureProtos.Procedure proc) { + this.proc = proc; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Procedure(pid="); + sb.append(proc.getProcId()); + sb.append(", ppid="); + sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID); + sb.append(", class="); + sb.append(proc.getClassName()); + sb.append(")"); + return sb.toString(); + } + } + + // when loading we will iterator the procedures twice, so use this class to cache the deserialized + // result to prevent deserializing multiple times. + private static final class ProtoAndProc { + private final ProcedureProtos.Procedure proto; + + private Procedure proc; + + public ProtoAndProc(ProcedureProtos.Procedure proto) { + this.proto = proto; + } + + public Procedure getProc() throws IOException { + if (proc == null) { + proc = ProcedureUtil.convertToProcedure(proto); + } + return proc; + } + } + + private final List validProcs = new ArrayList<>(); + + private final List corruptedProcs = new ArrayList<>(); + + private static boolean isFinished(ProcedureProtos.Procedure proc) { + if (!proc.hasParentId()) { + switch (proc.getState()) { + case ROLLEDBACK: + case SUCCESS: + return true; + default: + break; + } + } + return false; + } + + private WALProcedureTree(Map procMap) { + List rootEntries = buildTree(procMap); + for (Entry rootEntry : rootEntries) { + checkReady(rootEntry, procMap); + } + checkOrphan(procMap); + Comparator cmp = + (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId()); + Collections.sort(validProcs, cmp); + Collections.sort(corruptedProcs, cmp); + } + + private List buildTree(Map procMap) { + List rootEntries = new ArrayList<>(); + procMap.values().forEach(entry -> { + if (!entry.proc.hasParentId()) { + rootEntries.add(entry); + } else { + Entry parentEntry = procMap.get(entry.proc.getParentId()); + // For a valid procedure this should not be null. We will log the error later if it is null, + // as it will not be referenced by any root procedures. + if (parentEntry != null) { + parentEntry.subProcs.add(entry); + } + } + }); + return rootEntries; + } + + private void collectStackId(Entry entry, Map> stackId2Proc, + MutableInt maxStackId) { + for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) { + int stackId = entry.proc.getStackId(i); + if (stackId > maxStackId.intValue()) { + maxStackId.setValue(stackId); + } + stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry); + } + entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId)); + } + + private void addAllToCorruptedAndRemoveFromProcMap(Entry entry, + Map remainingProcMap) { + corruptedProcs.add(new ProtoAndProc(entry.proc)); + remainingProcMap.remove(entry.proc.getProcId()); + for (Entry e : entry.subProcs) { + addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap); + } + } + + private void addAllToValidAndRemoveFromProcMap(Entry entry, Map remainingProcMap) { + validProcs.add(new ProtoAndProc(entry.proc)); + remainingProcMap.remove(entry.proc.getProcId()); + for (Entry e : entry.subProcs) { + addAllToValidAndRemoveFromProcMap(e, remainingProcMap); + } + } + + // In this method first we will check whether the given root procedure and all its sub procedures + // are valid, through the procedure stack. And we will also remove all these procedures from the + // remainingProcMap, so at last, if there are still procedures in the map, we know that there are + // orphan procedures. + private void checkReady(Entry rootEntry, Map remainingProcMap) { + if (isFinished(rootEntry.proc)) { + if (!rootEntry.subProcs.isEmpty()) { + LOG.error("unexpected active children for root-procedure: {}", rootEntry); + rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e)); + addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); + } else { + addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); + } + return; + } + Map> stackId2Proc = new HashMap<>(); + MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE); + collectStackId(rootEntry, stackId2Proc, maxStackId); + // the stack ids should start from 0 and increase by one every time + boolean valid = true; + for (int i = 0; i <= maxStackId.intValue(); i++) { + List entries = stackId2Proc.get(i); + if (entries == null) { + LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId, + rootEntry); + valid = false; + } else if (entries.size() > 1) { + LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {}," + + " root procedure is {}", entries, i, maxStackId, rootEntry); + valid = false; + } + } + if (valid) { + addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); + } else { + addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); + } + } + + private void checkOrphan(Map procMap) { + procMap.values().forEach(entry -> { + LOG.error("Orphan procedure: {}", entry); + corruptedProcs.add(new ProtoAndProc(entry.proc)); + }); + } + + private static final class Iter implements ProcedureIterator { + + private final List procs; + + private Iterator iter; + + private ProtoAndProc current; + + public Iter(List procs) { + this.procs = procs; + reset(); + } + + @Override + public void reset() { + iter = procs.iterator(); + if (iter.hasNext()) { + current = iter.next(); + } else { + current = null; + } + } + + @Override + public boolean hasNext() { + return current != null; + } + + private void checkNext() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + } + + @Override + public boolean isNextFinished() { + checkNext(); + return isFinished(current.proto); + } + + private void moveToNext() { + if (iter.hasNext()) { + current = iter.next(); + } else { + current = null; + } + } + + @Override + public void skipNext() { + checkNext(); + moveToNext(); + } + + @Override + public Procedure next() throws IOException { + checkNext(); + Procedure proc = current.getProc(); + moveToNext(); + return proc; + } + } + + public ProcedureIterator getValidProcs() { + return new Iter(validProcs); + } + + public ProcedureIterator getCorruptedProcs() { + return new Iter(corruptedProcs); + } + + public static WALProcedureTree build(Collection procedures) { + Map procMap = new HashMap<>(); + for (ProcedureProtos.Procedure proc : procedures) { + procMap.put(proc.getProcId(), new Entry(proc)); + } + return new WALProcedureTree(procMap); + } +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java index 443386d0845..da53fa5d0bb 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; import java.util.Random; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index d682481e886..0f598b0df8e 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.FileNotFoundException; import java.io.IOException; @@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.procedure2.SequentialProcedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -72,7 +70,6 @@ public class TestWALProcedureStore { private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); private static final int PROCEDURE_STORE_SLOTS = 1; - private static final Procedure NULL_PROC = null; private WALProcedureStore procStore; @@ -153,7 +150,7 @@ public class TestWALProcedureStore { @Test public void testWalCleanerSequentialClean() throws Exception { - final Procedure[] procs = new Procedure[5]; + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; // Insert procedures and roll wal after every insert. @@ -182,7 +179,7 @@ public class TestWALProcedureStore { // they are in the starting of the list. @Test public void testWalCleanerNoHoles() throws Exception { - final Procedure[] procs = new Procedure[5]; + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; // Insert procedures and roll wal after every insert. for (int i = 0; i < procs.length; i++) { @@ -242,7 +239,7 @@ public class TestWALProcedureStore { @Test public void testWalCleanerWithEmptyRolls() throws Exception { - final Procedure[] procs = new Procedure[3]; + final Procedure[] procs = new Procedure[3]; for (int i = 0; i < procs.length; ++i) { procs[i] = new TestSequentialProcedure(); procStore.insert(procs[i], null); @@ -284,12 +281,12 @@ public class TestWALProcedureStore { Set procIds = new HashSet<>(); // Insert something in the log - Procedure proc1 = new TestSequentialProcedure(); + Procedure proc1 = new TestSequentialProcedure(); procIds.add(proc1.getProcId()); procStore.insert(proc1, null); - Procedure proc2 = new TestSequentialProcedure(); - Procedure[] child2 = new Procedure[2]; + Procedure proc2 = new TestSequentialProcedure(); + Procedure[] child2 = new Procedure[2]; child2[0] = new TestSequentialProcedure(); child2[1] = new TestSequentialProcedure(); @@ -323,11 +320,11 @@ public class TestWALProcedureStore { @Test public void testNoTrailerDoubleRestart() throws Exception { // log-0001: proc 0, 1 and 2 are inserted - Procedure proc0 = new TestSequentialProcedure(); + Procedure proc0 = new TestSequentialProcedure(); procStore.insert(proc0, null); - Procedure proc1 = new TestSequentialProcedure(); + Procedure proc1 = new TestSequentialProcedure(); procStore.insert(proc1, null); - Procedure proc2 = new TestSequentialProcedure(); + Procedure proc2 = new TestSequentialProcedure(); procStore.insert(proc2, null); procStore.rollWriterForTesting(); @@ -420,7 +417,7 @@ public class TestWALProcedureStore { } private static void assertUpdated(final ProcedureStoreTracker tracker, - final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { + final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { for (int index : updatedProcs) { long procId = procs[index].getProcId(); assertTrue("Procedure id : " + procId, tracker.isModified(procId)); @@ -432,7 +429,7 @@ public class TestWALProcedureStore { } private static void assertDeleted(final ProcedureStoreTracker tracker, - final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { + final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { for (int index : deletedProcs) { long procId = procs[index].getProcId(); assertEquals("Procedure id : " + procId, @@ -447,7 +444,7 @@ public class TestWALProcedureStore { @Test public void testCorruptedTrailersRebuild() throws Exception { - final Procedure[] procs = new Procedure[6]; + final Procedure[] procs = new Procedure[6]; for (int i = 0; i < procs.length; ++i) { procs[i] = new TestSequentialProcedure(); } @@ -575,127 +572,20 @@ public class TestWALProcedureStore { storeRestart(loader); assertEquals(0, loader.getLoadedCount()); assertEquals(rootProcs.length, loader.getCorruptedCount()); - for (Procedure proc: loader.getCorrupted()) { + for (Procedure proc : loader.getCorrupted()) { assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); assertTrue(proc.toString(), - proc.getProcId() > rootProcs.length && - proc.getProcId() <= (rootProcs.length * 2)); + proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2)); } } - @Test - public void testWalReplayOrder_AB_A() throws Exception { - /* - * | A B | -> | A | - */ - TestProcedure a = new TestProcedure(1, 0); - TestProcedure b = new TestProcedure(2, 1); - - procStore.insert(a, null); - a.addStackId(0); - procStore.update(a); - - procStore.insert(a, new Procedure[] { b }); - b.addStackId(1); - procStore.update(b); - - procStore.rollWriterForTesting(); - - a.addStackId(2); - procStore.update(a); - - storeRestart(new ProcedureStore.ProcedureLoader() { - @Override - public void setMaxProcId(long maxProcId) { - assertEquals(2, maxProcId); - } - - @Override - public void load(ProcedureIterator procIter) throws IOException { - assertTrue(procIter.hasNext()); - assertEquals(1, procIter.next().getProcId()); - assertTrue(procIter.hasNext()); - assertEquals(2, procIter.next().getProcId()); - assertFalse(procIter.hasNext()); - } - - @Override - public void handleCorrupted(ProcedureIterator procIter) throws IOException { - assertFalse(procIter.hasNext()); - } - }); - } - - @Test - public void testWalReplayOrder_ABC_BAD() throws Exception { - /* - * | A B C | -> | B A D | - */ - TestProcedure a = new TestProcedure(1, 0); - TestProcedure b = new TestProcedure(2, 1); - TestProcedure c = new TestProcedure(3, 2); - TestProcedure d = new TestProcedure(4, 0); - - procStore.insert(a, null); - a.addStackId(0); - procStore.update(a); - - procStore.insert(a, new Procedure[] { b }); - b.addStackId(1); - procStore.update(b); - - procStore.insert(b, new Procedure[] { c }); - b.addStackId(2); - procStore.update(b); - - procStore.rollWriterForTesting(); - - b.addStackId(3); - procStore.update(b); - - a.addStackId(4); - procStore.update(a); - - procStore.insert(d, null); - d.addStackId(0); - procStore.update(d); - - storeRestart(new ProcedureStore.ProcedureLoader() { - @Override - public void setMaxProcId(long maxProcId) { - assertEquals(4, maxProcId); - } - - @Override - public void load(ProcedureIterator procIter) throws IOException { - assertTrue(procIter.hasNext()); - assertEquals(4, procIter.next().getProcId()); - // TODO: This will be multiple call once we do fast-start - //assertFalse(procIter.hasNext()); - - assertTrue(procIter.hasNext()); - assertEquals(1, procIter.next().getProcId()); - assertTrue(procIter.hasNext()); - assertEquals(2, procIter.next().getProcId()); - assertTrue(procIter.hasNext()); - assertEquals(3, procIter.next().getProcId()); - assertFalse(procIter.hasNext()); - } - - @Override - public void handleCorrupted(ProcedureIterator procIter) throws IOException { - assertFalse(procIter.hasNext()); - } - }); - } - @Test public void testRollAndRemove() throws IOException { // Insert something in the log - Procedure proc1 = new TestSequentialProcedure(); + Procedure proc1 = new TestSequentialProcedure(); procStore.insert(proc1, null); - Procedure proc2 = new TestSequentialProcedure(); + Procedure proc2 = new TestSequentialProcedure(); procStore.insert(proc2, null); // roll the log, now we have 2 @@ -942,17 +832,6 @@ public class TestWALProcedureStore { assertEquals(0, loader.getCorruptedCount()); } - private void assertEmptyLogDir() { - try { - FileStatus[] status = fs.listStatus(logDir); - assertTrue("expected empty state-log dir", status == null || status.length == 0); - } catch (FileNotFoundException e) { - fail("expected the state-log dir to be present: " + logDir); - } catch (IOException e) { - fail("got en exception on state-log dir list: " + e.getMessage()); - } - } - public static class TestSequentialProcedure extends SequentialProcedure { private static long seqid = 0; @@ -961,13 +840,18 @@ public class TestWALProcedureStore { } @Override - protected Procedure[] execute(Void env) { return null; } + protected Procedure[] execute(Void env) { + return null; + } @Override - protected void rollback(Void env) { } + protected void rollback(Void env) { + } @Override - protected boolean abort(Void env) { return false; } + protected boolean abort(Void env) { + return false; + } @Override protected void serializeStateData(ProcedureStateSerializer serializer) diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java new file mode 100644 index 00000000000..890d0e3e9c0 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.wal; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestWALProcedureTree { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALProcedureTree.class); + + public static final class TestProcedure extends Procedure { + + @Override + public void setProcId(long procId) { + super.setProcId(procId); + } + + @Override + public void setParentProcId(long parentProcId) { + super.setParentProcId(parentProcId); + } + + @Override + public synchronized void addStackIndex(int index) { + super.addStackIndex(index); + } + + @Override + protected Procedure[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return null; + } + + @Override + protected void rollback(Void env) throws IOException, InterruptedException { + } + + @Override + protected boolean abort(Void env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + private TestProcedure createProc(long procId, long parentProcId) { + TestProcedure proc = new TestProcedure(); + proc.setProcId(procId); + if (parentProcId != Procedure.NO_PROC_ID) { + proc.setParentProcId(parentProcId); + } + return proc; + } + + private List toProtos(TestProcedure... procs) { + return Arrays.stream(procs).map(p -> { + try { + return ProcedureUtil.convertToProtoProcedure(p); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + } + + private List getProcs(ProcedureIterator iter) throws IOException { + List procs = new ArrayList<>(); + while (iter.hasNext()) { + procs.add((TestProcedure) iter.next()); + } + return procs; + } + + @Test + public void testMissingStackId() throws IOException { + TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID); + proc0.addStackIndex(0); + TestProcedure proc1 = createProc(2, 1); + proc1.addStackIndex(1); + TestProcedure proc2 = createProc(3, 2); + proc2.addStackIndex(3); + WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2)); + List validProcs = getProcs(tree.getValidProcs()); + assertEquals(0, validProcs.size()); + List corruptedProcs = getProcs(tree.getCorruptedProcs()); + assertEquals(3, corruptedProcs.size()); + assertEquals(1, corruptedProcs.get(0).getProcId()); + assertEquals(2, corruptedProcs.get(1).getProcId()); + assertEquals(3, corruptedProcs.get(2).getProcId()); + } + + @Test + public void testDuplicatedStackId() throws IOException { + TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID); + proc0.addStackIndex(0); + TestProcedure proc1 = createProc(2, 1); + proc1.addStackIndex(1); + TestProcedure proc2 = createProc(3, 2); + proc2.addStackIndex(1); + WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2)); + List validProcs = getProcs(tree.getValidProcs()); + assertEquals(0, validProcs.size()); + List corruptedProcs = getProcs(tree.getCorruptedProcs()); + assertEquals(3, corruptedProcs.size()); + assertEquals(1, corruptedProcs.get(0).getProcId()); + assertEquals(2, corruptedProcs.get(1).getProcId()); + assertEquals(3, corruptedProcs.get(2).getProcId()); + } + + @Test + public void testOrphan() throws IOException { + TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID); + proc0.addStackIndex(0); + TestProcedure proc1 = createProc(2, 1); + proc1.addStackIndex(1); + TestProcedure proc2 = createProc(3, Procedure.NO_PROC_ID); + proc2.addStackIndex(0); + TestProcedure proc3 = createProc(5, 4); + proc3.addStackIndex(1); + WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2, proc3)); + List validProcs = getProcs(tree.getValidProcs()); + assertEquals(3, validProcs.size()); + List corruptedProcs = getProcs(tree.getCorruptedProcs()); + assertEquals(1, corruptedProcs.size()); + assertEquals(5, corruptedProcs.get(0).getProcId()); + assertEquals(4, corruptedProcs.get(0).getParentProcId()); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e2e4aec00b6..ab9a7992ed5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; * avoiding port contention if another local HBase instance is already running). *

To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" * setting it to true. + * Trigger pre commit. */ @InterfaceAudience.Public @SuppressWarnings("deprecation")