diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index ee61841d025..b401871a50c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -589,6 +589,11 @@ public abstract class Procedure implements Comparable { return --childrenLatch == 0; } + @InterfaceAudience.Private + protected synchronized boolean hasChildren() { + return childrenLatch > 0; + } + /** * Called by the RootProcedureState on procedure execution. * Each procedure store its stack-index positions. @@ -606,7 +611,7 @@ public abstract class Procedure implements Comparable { @InterfaceAudience.Private protected synchronized boolean removeStackIndex() { - if (stackIndexes.length > 1) { + if (stackIndexes != null && stackIndexes.length > 1) { stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); return false; } else { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 9d71f6561af..198623d8cb6 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -18,16 +18,18 @@ package org.apache.hadoop.hbase.procedure2; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Pair; -import com.google.common.base.Preconditions; - /** * Thread Pool that executes the submitted procedures. * The executor has a ProcedureStore associated. @@ -314,7 +314,7 @@ public class ProcedureExecutor { corruptedCount++; } if (abortOnCorruption && corruptedCount > 0) { - throw new IOException("found " + corruptedCount + " procedures on replay"); + throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); } } }); @@ -388,10 +388,10 @@ public class ProcedureExecutor { continue; } - if (proc.hasParent() && !proc.isFinished()) { + if (proc.hasParent()) { Procedure parent = procedures.get(proc.getParentProcId()); // corrupted procedures are handled later at step 3 - if (parent != null) { + if (parent != null && !proc.isFinished()) { parent.incChildrenLatch(); } } @@ -403,6 +403,11 @@ public class ProcedureExecutor { case RUNNABLE: runnableList.add(proc); break; + case WAITING: + if (!proc.hasChildren()) { + runnableList.add(proc); + } + break; case WAITING_TIMEOUT: if (waitingSet == null) { waitingSet = new HashSet(); @@ -413,8 +418,8 @@ public class ProcedureExecutor { if (proc.hasException()) { // add the proc to the runnables to perform the rollback runnables.addBack(proc); - break; } + break; case ROLLEDBACK: case INITIALIZING: String msg = "Unexpected " + proc.getState() + " state for " + proc; @@ -433,7 +438,7 @@ public class ProcedureExecutor { RootProcedureState procStack = entry.getValue(); if (procStack.isValid()) continue; - for (Procedure proc: procStack.getSubprocedures()) { + for (Procedure proc: procStack.getSubproceduresStack()) { LOG.error("corrupted procedure: " + proc); procedures.remove(proc.getProcId()); runnableList.remove(proc); @@ -940,7 +945,7 @@ public class ProcedureExecutor { store.update(rootProc); } - List subprocStack = procStack.getSubprocedures(); + List subprocStack = procStack.getSubproceduresStack(); assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; int stackTail = subprocStack.size(); @@ -1022,7 +1027,12 @@ public class ProcedureExecutor { store.delete(proc.getProcId()); procedures.remove(proc.getProcId()); } else { - store.update(proc); + final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); + if (childProcIds != null) { + store.delete(proc, childProcIds); + } else { + store.update(proc); + } } } else { store.update(proc); @@ -1102,6 +1112,7 @@ public class ProcedureExecutor { assert subproc.getState() == ProcedureState.INITIALIZING : subproc; subproc.setParentProcId(procedure.getProcId()); subproc.setProcId(nextProcId()); + procStack.addSubProcedure(subproc); } if (!procedure.isFailed()) { @@ -1138,17 +1149,7 @@ public class ProcedureExecutor { } // Commit the transaction - if (subprocs != null && !procedure.isFailed()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs)); - } - store.insert(procedure, subprocs); - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Store update " + procedure); - } - store.update(procedure); - } + updateStoreOnExec(procStack, procedure, subprocs); // if the store is not running we are aborting if (!store.isRunning()) { @@ -1198,6 +1199,34 @@ public class ProcedureExecutor { } } + private void updateStoreOnExec(final RootProcedureState procStack, + final Procedure procedure, final Procedure[] subprocs) { + if (subprocs != null && !procedure.isFailed()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs)); + } + store.insert(procedure, subprocs); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Store update " + procedure); + } + if (procedure.isFinished() && !procedure.hasParent()) { + // remove child procedures + final long[] childProcIds = procStack.getSubprocedureIds(); + if (childProcIds != null) { + store.delete(procedure, childProcIds); + for (int i = 0; i < childProcIds.length; ++i) { + procedures.remove(childProcIds[i]); + } + } else { + store.update(procedure); + } + } else { + store.update(procedure); + } + } + } + private void handleInterruptedException(final Procedure proc, final InterruptedException e) { if (LOG.isTraceEnabled()) { LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java index bc1af207b0b..b679cb1c771 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.procedure2; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +51,8 @@ class RootProcedureState { ROLLINGBACK, // The Procedure failed and the execution was rolledback } - private ArrayList subprocedures = null; + private Set subprocs = null; + private ArrayList subprocStack = null; private State state = State.RUNNING; private int running = 0; @@ -87,13 +90,23 @@ class RootProcedureState { state = State.FAILED; } - protected synchronized List getSubprocedures() { - return subprocedures; + protected synchronized long[] getSubprocedureIds() { + if (subprocs == null) return null; + int index = 0; + final long[] subIds = new long[subprocs.size()]; + for (Procedure proc: subprocs) { + subIds[index++] = proc.getProcId(); + } + return subIds; + } + + protected synchronized List getSubproceduresStack() { + return subprocStack; } protected synchronized RemoteProcedureException getException() { - if (subprocedures != null) { - for (Procedure proc: subprocedures) { + if (subprocStack != null) { + for (Procedure proc: subprocStack) { if (proc.hasException()) { return proc.getException(); } @@ -133,11 +146,19 @@ class RootProcedureState { if (proc.isFailed()) { state = State.FAILED; } - if (subprocedures == null) { - subprocedures = new ArrayList(); + if (subprocStack == null) { + subprocStack = new ArrayList(); } - proc.addStackIndex(subprocedures.size()); - subprocedures.add(proc); + proc.addStackIndex(subprocStack.size()); + subprocStack.add(proc); + } + + protected synchronized void addSubProcedure(final Procedure proc) { + if (!proc.hasParent()) return; + if (subprocs == null) { + subprocs = new HashSet(); + } + subprocs.add(proc); } /** @@ -148,18 +169,19 @@ class RootProcedureState { * on load we recreate the full stack by aggregating each procedure stack-positions. */ protected synchronized void loadStack(final Procedure proc) { + addSubProcedure(proc); int[] stackIndexes = proc.getStackIndexes(); if (stackIndexes != null) { - if (subprocedures == null) { - subprocedures = new ArrayList(); + if (subprocStack == null) { + subprocStack = new ArrayList(); } - int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocedures.size(); + int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size(); if (diff > 0) { - subprocedures.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]); - while (diff-- > 0) subprocedures.add(null); + subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]); + while (diff-- > 0) subprocStack.add(null); } for (int i = 0; i < stackIndexes.length; ++i) { - subprocedures.set(stackIndexes[i], proc); + subprocStack.set(stackIndexes[i], proc); } } if (proc.getState() == ProcedureState.ROLLEDBACK) { @@ -173,8 +195,8 @@ class RootProcedureState { * Called on store load by the ProcedureExecutor to validate the procedure stack. */ protected synchronized boolean isValid() { - if (subprocedures != null) { - for (Procedure proc: subprocedures) { + if (subprocStack != null) { + for (Procedure proc: subprocStack) { if (proc == null) { return false; } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 62448fb381b..c9808a1cce0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -70,4 +70,9 @@ public class NoopProcedureStore extends ProcedureStoreBase { public void delete(long procId) { // no-op } + + @Override + public void delete(Procedure proc, long[] subprocs) { + // no-op + } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 5308c1bc4f8..11216d80dc3 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; +import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.procedure2.Procedure; /** @@ -188,4 +188,12 @@ public interface ProcedureStore { * @param procId the ID of the procedure to remove. */ void delete(long procId); + + /** + * The parent procedure completed. + * Update the state and mark all the child deleted. + * @param parentProc the parent procedure to serialize and write to the store. + * @param subProcIds the IDs of the sub-procedure to remove. + */ + void delete(Procedure parentProc, long[] subProcIds); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index fe2904b3dd8..f152ecfb207 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -394,6 +395,14 @@ public class ProcedureStoreTracker { trackProcIds(procId); } + public void delete(long[] procIds) { + // TODO: optimize + Arrays.sort(procIds); + for (int i = 0; i < procIds.length; ++i) { + delete(procIds[i]); + } + } + private void trackProcIds(long procId) { minUpdatedProcId = Math.min(minUpdatedProcId, procId); maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index add7d03cd9d..1a44871b167 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.procedure2.store.wal; +import com.google.protobuf.InvalidProtocolBufferException; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -36,8 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEn import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; -import com.google.protobuf.InvalidProtocolBufferException; - /** * Helper class that contains the WAL serialization utils. */ @@ -231,4 +231,18 @@ public final class ProcedureWALFormat { builder.setProcId(procId); builder.build().writeDelimitedTo(slot); } + + public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) + throws IOException { + ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); + builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); + builder.setProcId(proc.getProcId()); + if (subprocs != null) { + builder.addProcedure(Procedure.convert(proc)); + for (int i = 0; i < subprocs.length; ++i) { + builder.addChildId(subprocs[i]); + } + } + builder.build().writeDelimitedTo(slot); + } } \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index b1b201bd292..8678c864963 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.procedure2.store.wal; +import com.google.protobuf.InvalidProtocolBufferException; + import java.io.IOException; import org.apache.commons.logging.Log; @@ -33,8 +35,6 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; -import com.google.protobuf.InvalidProtocolBufferException; - /** * Helper class that loads the procedures stored in a WAL */ @@ -209,15 +209,34 @@ public class ProcedureWALFormatReader { } private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { - assert entry.getProcedureCount() == 0 : "Expected no procedures"; assert entry.hasProcId() : "expected ProcID"; - if (LOG.isTraceEnabled()) { - LOG.trace("read delete entry " + entry.getProcId()); + + if (entry.getChildIdCount() > 0) { + assert entry.getProcedureCount() == 1 : "Expected only one procedure"; + + // update the parent procedure + loadProcedure(entry, entry.getProcedure(0)); + + // remove the child procedures of entry.getProcId() + for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) { + deleteEntry(entry.getChildId(i)); + } + } else { + assert entry.getProcedureCount() == 0 : "Expected no procedures"; + + // delete the procedure + deleteEntry(entry.getProcId()); } - maxProcId = Math.max(maxProcId, entry.getProcId()); - localProcedureMap.remove(entry.getProcId()); - assert !procedureMap.contains(entry.getProcId()); - tracker.setDeleted(entry.getProcId(), true); + } + + private void deleteEntry(final long procId) { + if (LOG.isTraceEnabled()) { + LOG.trace("delete entry " + procId); + } + maxProcId = Math.max(maxProcId, procId); + localProcedureMap.remove(procId); + assert !procedureMap.contains(procId); + tracker.setDeleted(procId, true); } private boolean isDeleted(final long procId) { @@ -269,6 +288,8 @@ public class ProcedureWALFormatReader { public boolean isCompleted() { if (!hasParent()) { + // we only consider 'root' procedures. because for the user 'completed' + // means when everything up to the 'root' is complete. switch (proto.getState()) { case ROLLEDBACK: return true; @@ -294,7 +315,15 @@ public class ProcedureWALFormatReader { @Override public String toString() { - return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")"; + final StringBuilder sb = new StringBuilder(); + sb.append("Entry("); + sb.append(getProcId()); + sb.append(", parentId="); + sb.append(getParentId()); + sb.append(", class="); + sb.append(proto.getClassName()); + sb.append(")"); + return sb.toString(); } } @@ -603,6 +632,22 @@ public class ProcedureWALFormatReader { * There is a gap between A stackIds so something was executed in between. */ private boolean checkReadyToRun(Entry rootEntry) { + assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; + + if (rootEntry.isCompleted()) { + // if the root procedure is completed, sub-procedures should be gone + if (rootEntry.childHead != null) { + LOG.error("unexpected active children for root-procedure: " + rootEntry); + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + LOG.error("unexpected active children: " + p); + } + } + + assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry; + rootEntry.ready = true; + return true; + } + int stackIdSum = 0; int maxStackId = 0; for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 560072f1dbc..f06270a17b8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.procedure2.store.wal; +import com.google.common.annotations.VisibleForTesting; + import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -57,8 +59,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHe import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; -import com.google.common.annotations.VisibleForTesting; - /** * WAL implementation of the ProcedureStore. */ @@ -461,6 +461,29 @@ public class WALProcedureStore extends ProcedureStoreBase { } } + @Override + public void delete(final Procedure proc, final long[] subProcIds) { + if (LOG.isTraceEnabled()) { + LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); + } + + ByteSlot slot = acquireSlot(); + try { + // Serialize the delete + ProcedureWALFormat.writeDelete(slot, proc, subProcIds); + + // Push the transaction data and wait until it is persisted + pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize the procedure: " + proc, e); + throw new RuntimeException(e); + } finally { + releaseSlot(slot); + } + } + private ByteSlot acquireSlot() { ByteSlot slot = slotsCache.poll(); return slot != null ? slot : new ByteSlot(); @@ -544,7 +567,11 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.update(procId); break; case DELETE: - storeTracker.delete(procId); + if (subProcIds != null && subProcIds.length > 0) { + storeTracker.delete(subProcIds); + } else { + storeTracker.delete(procId); + } break; default: throw new RuntimeException("invalid push type " + type); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 03fa5167085..cafe1417ed0 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -232,6 +233,10 @@ public class ProcedureTestingUtility { addStackIndex(index); } + public void setFinishedState() { + setState(ProcedureState.FINISHED); + } + @Override protected Procedure[] execute(Void env) { return null; } @@ -250,7 +255,8 @@ public class ProcedureTestingUtility { public static class LoadCounter implements ProcedureStore.ProcedureLoader { private final ArrayList corrupted = new ArrayList(); - private final ArrayList loaded = new ArrayList(); + private final ArrayList completed = new ArrayList(); + private final ArrayList runnable = new ArrayList(); private Set procIds; private long maxProcId = 0; @@ -269,7 +275,8 @@ public class ProcedureTestingUtility { public void reset(final Set procIds) { corrupted.clear(); - loaded.clear(); + completed.clear(); + runnable.clear(); this.procIds = procIds; this.maxProcId = 0; } @@ -278,12 +285,24 @@ public class ProcedureTestingUtility { return maxProcId; } - public ArrayList getLoaded() { - return loaded; + public ArrayList getRunnables() { + return runnable; + } + + public int getRunnableCount() { + return runnable.size(); + } + + public ArrayList getCompleted() { + return completed; + } + + public int getCompletedCount() { + return completed.size(); } public int getLoadedCount() { - return loaded.size(); + return runnable.size() + completed.size(); } public ArrayList getCorrupted() { @@ -302,13 +321,21 @@ public class ProcedureTestingUtility { @Override public void load(ProcedureIterator procIter) throws IOException { while (procIter.hasNext()) { - Procedure proc = procIter.nextAsProcedure(); - LOG.debug("loading procId=" + proc.getProcId() + ": " + proc); - if (procIds != null) { - assertTrue("procId=" + proc.getProcId() + " unexpected", - procIds.contains(proc.getProcId())); + long procId; + if (procIter.isNextCompleted()) { + ProcedureInfo proc = procIter.nextAsProcedureInfo(); + procId = proc.getProcId(); + LOG.debug("loading completed procId=" + procId + ": " + proc); + completed.add(proc); + } else { + Procedure proc = procIter.nextAsProcedure(); + procId = proc.getProcId(); + LOG.debug("loading runnable procId=" + procId + ": " + proc); + runnable.add(proc); + } + if (procIds != null) { + assertTrue("procId=" + procId + " unexpected", procIds.contains(procId)); } - loaded.add(proc); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java new file mode 100644 index 00000000000..4dd280beb46 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, SmallTests.class}) +public class TestChildProcedures { + private static final Log LOG = LogFactory.getLog(TestChildProcedures.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + + private static TestProcEnv procEnv; + private static ProcedureExecutor procExecutor; + private static ProcedureStore procStore; + private static int procSleepInterval; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path testDir; + private Path logDir; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procEnv = new TestProcEnv(); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor.testing = new ProcedureExecutor.Testing(); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + procSleepInterval = 0; + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + fs.delete(logDir, true); + } + + @Test + public void testChildLoad() throws Exception { + procEnv.toggleKillBeforeStoreUpdate = false; + + TestRootProcedure proc = new TestRootProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + ProcedureTestingUtility.restart(procExecutor); + assertTrue("expected completed proc", procExecutor.isFinished(procId)); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); + } + + @Test + public void testChildLoadWithSteppedRestart() throws Exception { + procEnv.toggleKillBeforeStoreUpdate = true; + + TestRootProcedure proc = new TestRootProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + int restartCount = 0; + while (!procExecutor.isFinished(procId)) { + ProcedureTestingUtility.restart(procExecutor); + restartCount++; + } + assertEquals(7, restartCount); + assertTrue("expected completed proc", procExecutor.isFinished(procId)); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); + } + + @Test + public void testChildRollbackLoad() throws Exception { + procEnv.toggleKillBeforeStoreUpdate = false; + procEnv.triggerRollbackOnChild = true; + + TestRootProcedure proc = new TestRootProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + ProcedureTestingUtility.restart(procExecutor); + + assertProcFailed(procId); + } + + @Test + public void testChildRollbackLoadWithSteppedRestart() throws Exception { + procEnv.toggleKillBeforeStoreUpdate = true; + procEnv.triggerRollbackOnChild = true; + + TestRootProcedure proc = new TestRootProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + int restartCount = 0; + while (!procExecutor.isFinished(procId)) { + ProcedureTestingUtility.restart(procExecutor); + restartCount++; + } + assertEquals(6, restartCount); + assertProcFailed(procId); + } + + private void assertProcFailed(long procId) { + assertTrue("expected completed proc", procExecutor.isFinished(procId)); + ProcedureInfo result = procExecutor.getResult(procId); + assertEquals(true, result.isFailed()); + LOG.info(result.getExceptionFullMessage()); + } + + public static class TestRootProcedure extends SequentialProcedure { + public TestRootProcedure() {} + + public Procedure[] execute(TestProcEnv env) { + if (env.toggleKillBeforeStoreUpdate) { + ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); + } + return new Procedure[] { new TestChildProcedure(), new TestChildProcedure() }; + } + + public void rollback(TestProcEnv env) { + } + + @Override + public boolean abort(TestProcEnv env) { + return false; + } + } + + public static class TestChildProcedure extends SequentialProcedure { + public TestChildProcedure() {} + + public Procedure[] execute(TestProcEnv env) { + if (env.toggleKillBeforeStoreUpdate) { + ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); + } + if (env.triggerRollbackOnChild) { + setFailure("test", new Exception("test")); + } + return null; + } + + public void rollback(TestProcEnv env) { + } + + @Override + public boolean abort(TestProcEnv env) { + return false; + } + } + + private static class TestProcEnv { + public boolean toggleKillBeforeStoreUpdate = false; + public boolean triggerRollbackOnChild = false; + } +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java index 023354c0cff..cc63e2eb89a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java @@ -106,7 +106,9 @@ public class TestStressWALProcedureStore { Random rand = new Random(); TestProcedure proc; do { - proc = new TestProcedure(procCounter.addAndGet(1)); + // After HBASE- there may be gap in the procId sequence, trying to simulate that. + long procId = procCounter.addAndGet(1 + rand.nextInt(3)); + proc = new TestProcedure(procId); // Insert procStore.insert(proc, null); // Update diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 8a244ddc78c..143a5733fc5 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; @@ -506,10 +507,56 @@ public class TestWALProcedureStore { procStore.recoverLease(); procStore.load(loader); assertEquals(procs.length, loader.getMaxProcId()); - assertEquals(procs.length - 1, loader.getLoadedCount()); + assertEquals(procs.length - 1, loader.getRunnableCount()); + assertEquals(0, loader.getCompletedCount()); assertEquals(0, loader.getCorruptedCount()); } + public void testLoadChildren() throws Exception { + TestProcedure a = new TestProcedure(1, 0); + TestProcedure b = new TestProcedure(2, 1); + TestProcedure c = new TestProcedure(3, 1); + + // INIT + procStore.insert(a, null); + + // Run A first step + a.addStackId(0); + procStore.update(a); + + // Run A second step + a.addStackId(1); + procStore.insert(a, new Procedure[] { b, c }); + + // Run B first step + b.addStackId(2); + procStore.update(b); + + // Run C first and last step + c.addStackId(3); + procStore.update(c); + + // Run B second setp + b.addStackId(4); + procStore.update(b); + + // back to A + a.addStackId(5); + a.setFinishedState(); + procStore.delete(a, new long[] { b.getProcId(), c.getProcId() }); + restartAndAssert(3, 0, 1, 0); + } + + private void restartAndAssert(long maxProcId, long runnableCount, + int completedCount, int corruptedCount) throws Exception { + final LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(maxProcId, loader.getMaxProcId()); + assertEquals(runnableCount, loader.getRunnableCount()); + assertEquals(completedCount, loader.getCompletedCount()); + assertEquals(corruptedCount, loader.getCorruptedCount()); + } + private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { assertTrue(logFile.getLen() > dropBytes); diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ProcedureProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ProcedureProtos.java index 9b368a22dd4..2aa71365d8a 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ProcedureProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ProcedureProtos.java @@ -6295,6 +6295,20 @@ public final class ProcedureProtos { * optional uint64 proc_id = 3; */ long getProcId(); + + // repeated uint64 child_id = 4; + /** + * repeated uint64 child_id = 4; + */ + java.util.List getChildIdList(); + /** + * repeated uint64 child_id = 4; + */ + int getChildIdCount(); + /** + * repeated uint64 child_id = 4; + */ + long getChildId(int index); } /** * Protobuf type {@code hbase.pb.ProcedureWALEntry} @@ -6371,6 +6385,27 @@ public final class ProcedureProtos { procId_ = input.readUInt64(); break; } + case 32: { + if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + childId_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000008; + } + childId_.add(input.readUInt64()); + break; + } + case 34: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + if (!((mutable_bitField0_ & 0x00000008) == 0x00000008) && input.getBytesUntilLimit() > 0) { + childId_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000008; + } + while (input.getBytesUntilLimit() > 0) { + childId_.add(input.readUInt64()); + } + input.popLimit(limit); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -6382,6 +6417,9 @@ public final class ProcedureProtos { if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { procedure_ = java.util.Collections.unmodifiableList(procedure_); } + if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + childId_ = java.util.Collections.unmodifiableList(childId_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -6600,10 +6638,34 @@ public final class ProcedureProtos { return procId_; } + // repeated uint64 child_id = 4; + public static final int CHILD_ID_FIELD_NUMBER = 4; + private java.util.List childId_; + /** + * repeated uint64 child_id = 4; + */ + public java.util.List + getChildIdList() { + return childId_; + } + /** + * repeated uint64 child_id = 4; + */ + public int getChildIdCount() { + return childId_.size(); + } + /** + * repeated uint64 child_id = 4; + */ + public long getChildId(int index) { + return childId_.get(index); + } + private void initFields() { type_ = org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry.Type.PROCEDURE_WAL_EOF; procedure_ = java.util.Collections.emptyList(); procId_ = 0L; + childId_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6636,6 +6698,9 @@ public final class ProcedureProtos { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeUInt64(3, procId_); } + for (int i = 0; i < childId_.size(); i++) { + output.writeUInt64(4, childId_.get(i)); + } getUnknownFields().writeTo(output); } @@ -6657,6 +6722,15 @@ public final class ProcedureProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(3, procId_); } + { + int dataSize = 0; + for (int i = 0; i < childId_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeUInt64SizeNoTag(childId_.get(i)); + } + size += dataSize; + size += 1 * getChildIdList().size(); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6692,6 +6766,8 @@ public final class ProcedureProtos { result = result && (getProcId() == other.getProcId()); } + result = result && getChildIdList() + .equals(other.getChildIdList()); result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -6717,6 +6793,10 @@ public final class ProcedureProtos { hash = (37 * hash) + PROC_ID_FIELD_NUMBER; hash = (53 * hash) + hashLong(getProcId()); } + if (getChildIdCount() > 0) { + hash = (37 * hash) + CHILD_ID_FIELD_NUMBER; + hash = (53 * hash) + getChildIdList().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -6837,6 +6917,8 @@ public final class ProcedureProtos { } procId_ = 0L; bitField0_ = (bitField0_ & ~0x00000004); + childId_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -6882,6 +6964,11 @@ public final class ProcedureProtos { to_bitField0_ |= 0x00000002; } result.procId_ = procId_; + if (((bitField0_ & 0x00000008) == 0x00000008)) { + childId_ = java.util.Collections.unmodifiableList(childId_); + bitField0_ = (bitField0_ & ~0x00000008); + } + result.childId_ = childId_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6930,6 +7017,16 @@ public final class ProcedureProtos { if (other.hasProcId()) { setProcId(other.getProcId()); } + if (!other.childId_.isEmpty()) { + if (childId_.isEmpty()) { + childId_ = other.childId_; + bitField0_ = (bitField0_ & ~0x00000008); + } else { + ensureChildIdIsMutable(); + childId_.addAll(other.childId_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -7276,6 +7373,72 @@ public final class ProcedureProtos { return this; } + // repeated uint64 child_id = 4; + private java.util.List childId_ = java.util.Collections.emptyList(); + private void ensureChildIdIsMutable() { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { + childId_ = new java.util.ArrayList(childId_); + bitField0_ |= 0x00000008; + } + } + /** + * repeated uint64 child_id = 4; + */ + public java.util.List + getChildIdList() { + return java.util.Collections.unmodifiableList(childId_); + } + /** + * repeated uint64 child_id = 4; + */ + public int getChildIdCount() { + return childId_.size(); + } + /** + * repeated uint64 child_id = 4; + */ + public long getChildId(int index) { + return childId_.get(index); + } + /** + * repeated uint64 child_id = 4; + */ + public Builder setChildId( + int index, long value) { + ensureChildIdIsMutable(); + childId_.set(index, value); + onChanged(); + return this; + } + /** + * repeated uint64 child_id = 4; + */ + public Builder addChildId(long value) { + ensureChildIdIsMutable(); + childId_.add(value); + onChanged(); + return this; + } + /** + * repeated uint64 child_id = 4; + */ + public Builder addAllChildId( + java.lang.Iterable values) { + ensureChildIdIsMutable(); + super.addAll(values, childId_); + onChanged(); + return this; + } + /** + * repeated uint64 child_id = 4; + */ + public Builder clearChildId() { + childId_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000008); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ProcedureWALEntry) } @@ -7355,19 +7518,19 @@ public final class ProcedureProtos { "eTracker\0229\n\004node\030\001 \003(\0132+.hbase.pb.Proced" + "ureStoreTracker.TrackerNode\032A\n\013TrackerNo" + "de\022\020\n\010start_id\030\001 \002(\004\022\017\n\007updated\030\002 \003(\004\022\017\n" + - "\007deleted\030\003 \003(\004\"\235\002\n\021ProcedureWALEntry\022.\n\004", + "\007deleted\030\003 \003(\004\"\257\002\n\021ProcedureWALEntry\022.\n\004", "type\030\001 \002(\0162 .hbase.pb.ProcedureWALEntry." + "Type\022&\n\tprocedure\030\002 \003(\0132\023.hbase.pb.Proce" + - "dure\022\017\n\007proc_id\030\003 \001(\004\"\236\001\n\004Type\022\025\n\021PROCED" + - "URE_WAL_EOF\020\001\022\026\n\022PROCEDURE_WAL_INIT\020\002\022\030\n" + - "\024PROCEDURE_WAL_INSERT\020\003\022\030\n\024PROCEDURE_WAL" + - "_UPDATE\020\004\022\030\n\024PROCEDURE_WAL_DELETE\020\005\022\031\n\025P" + - "ROCEDURE_WAL_COMPACT\020\006*p\n\016ProcedureState" + - "\022\020\n\014INITIALIZING\020\001\022\014\n\010RUNNABLE\020\002\022\013\n\007WAIT" + - "ING\020\003\022\023\n\017WAITING_TIMEOUT\020\004\022\016\n\nROLLEDBACK" + - "\020\005\022\014\n\010FINISHED\020\006BE\n*org.apache.hadoop.hb", - "ase.protobuf.generatedB\017ProcedureProtosH" + - "\001\210\001\001\240\001\001" + "dure\022\017\n\007proc_id\030\003 \001(\004\022\020\n\010child_id\030\004 \003(\004\"" + + "\236\001\n\004Type\022\025\n\021PROCEDURE_WAL_EOF\020\001\022\026\n\022PROCE" + + "DURE_WAL_INIT\020\002\022\030\n\024PROCEDURE_WAL_INSERT\020" + + "\003\022\030\n\024PROCEDURE_WAL_UPDATE\020\004\022\030\n\024PROCEDURE" + + "_WAL_DELETE\020\005\022\031\n\025PROCEDURE_WAL_COMPACT\020\006" + + "*p\n\016ProcedureState\022\020\n\014INITIALIZING\020\001\022\014\n\010" + + "RUNNABLE\020\002\022\013\n\007WAITING\020\003\022\023\n\017WAITING_TIMEO" + + "UT\020\004\022\016\n\nROLLEDBACK\020\005\022\014\n\010FINISHED\020\006BE\n*or", + "g.apache.hadoop.hbase.protobuf.generated" + + "B\017ProcedureProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7421,7 +7584,7 @@ public final class ProcedureProtos { internal_static_hbase_pb_ProcedureWALEntry_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ProcedureWALEntry_descriptor, - new java.lang.String[] { "Type", "Procedure", "ProcId", }); + new java.lang.String[] { "Type", "Procedure", "ProcId", "ChildId", }); return null; } }; diff --git a/hbase-protocol/src/main/protobuf/Procedure.proto b/hbase-protocol/src/main/protobuf/Procedure.proto index 55e44a494bb..ae1cf387858 100644 --- a/hbase-protocol/src/main/protobuf/Procedure.proto +++ b/hbase-protocol/src/main/protobuf/Procedure.proto @@ -116,4 +116,5 @@ message ProcedureWALEntry { required Type type = 1; repeated Procedure procedure = 2; optional uint64 proc_id = 3; + repeated uint64 child_id = 4; }