HBASE-16092 Procedure v2 - complete child procedure support
This commit is contained in:
parent
99dc300d37
commit
96c4054e3b
|
@ -589,6 +589,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
||||||
return --childrenLatch == 0;
|
return --childrenLatch == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
protected synchronized boolean hasChildren() {
|
||||||
|
return childrenLatch > 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by the RootProcedureState on procedure execution.
|
* Called by the RootProcedureState on procedure execution.
|
||||||
* Each procedure store its stack-index positions.
|
* Each procedure store its stack-index positions.
|
||||||
|
@ -606,7 +611,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
protected synchronized boolean removeStackIndex() {
|
protected synchronized boolean removeStackIndex() {
|
||||||
if (stackIndexes.length > 1) {
|
if (stackIndexes != null && stackIndexes.length > 1) {
|
||||||
stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
|
stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -18,16 +18,18 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.NonceKey;
|
import org.apache.hadoop.hbase.util.NonceKey;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread Pool that executes the submitted procedures.
|
* Thread Pool that executes the submitted procedures.
|
||||||
* The executor has a ProcedureStore associated.
|
* The executor has a ProcedureStore associated.
|
||||||
|
@ -314,7 +314,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
corruptedCount++;
|
corruptedCount++;
|
||||||
}
|
}
|
||||||
if (abortOnCorruption && corruptedCount > 0) {
|
if (abortOnCorruption && corruptedCount > 0) {
|
||||||
throw new IOException("found " + corruptedCount + " procedures on replay");
|
throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -388,10 +388,10 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proc.hasParent() && !proc.isFinished()) {
|
if (proc.hasParent()) {
|
||||||
Procedure parent = procedures.get(proc.getParentProcId());
|
Procedure parent = procedures.get(proc.getParentProcId());
|
||||||
// corrupted procedures are handled later at step 3
|
// corrupted procedures are handled later at step 3
|
||||||
if (parent != null) {
|
if (parent != null && !proc.isFinished()) {
|
||||||
parent.incChildrenLatch();
|
parent.incChildrenLatch();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -403,6 +403,11 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
case RUNNABLE:
|
case RUNNABLE:
|
||||||
runnableList.add(proc);
|
runnableList.add(proc);
|
||||||
break;
|
break;
|
||||||
|
case WAITING:
|
||||||
|
if (!proc.hasChildren()) {
|
||||||
|
runnableList.add(proc);
|
||||||
|
}
|
||||||
|
break;
|
||||||
case WAITING_TIMEOUT:
|
case WAITING_TIMEOUT:
|
||||||
if (waitingSet == null) {
|
if (waitingSet == null) {
|
||||||
waitingSet = new HashSet<Procedure>();
|
waitingSet = new HashSet<Procedure>();
|
||||||
|
@ -413,8 +418,8 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
if (proc.hasException()) {
|
if (proc.hasException()) {
|
||||||
// add the proc to the runnables to perform the rollback
|
// add the proc to the runnables to perform the rollback
|
||||||
runnables.addBack(proc);
|
runnables.addBack(proc);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
case ROLLEDBACK:
|
case ROLLEDBACK:
|
||||||
case INITIALIZING:
|
case INITIALIZING:
|
||||||
String msg = "Unexpected " + proc.getState() + " state for " + proc;
|
String msg = "Unexpected " + proc.getState() + " state for " + proc;
|
||||||
|
@ -433,7 +438,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
RootProcedureState procStack = entry.getValue();
|
RootProcedureState procStack = entry.getValue();
|
||||||
if (procStack.isValid()) continue;
|
if (procStack.isValid()) continue;
|
||||||
|
|
||||||
for (Procedure proc: procStack.getSubprocedures()) {
|
for (Procedure proc: procStack.getSubproceduresStack()) {
|
||||||
LOG.error("corrupted procedure: " + proc);
|
LOG.error("corrupted procedure: " + proc);
|
||||||
procedures.remove(proc.getProcId());
|
procedures.remove(proc.getProcId());
|
||||||
runnableList.remove(proc);
|
runnableList.remove(proc);
|
||||||
|
@ -940,7 +945,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
store.update(rootProc);
|
store.update(rootProc);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Procedure> subprocStack = procStack.getSubprocedures();
|
List<Procedure> subprocStack = procStack.getSubproceduresStack();
|
||||||
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
|
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
|
||||||
|
|
||||||
int stackTail = subprocStack.size();
|
int stackTail = subprocStack.size();
|
||||||
|
@ -1021,9 +1026,14 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
if (proc.hasParent()) {
|
if (proc.hasParent()) {
|
||||||
store.delete(proc.getProcId());
|
store.delete(proc.getProcId());
|
||||||
procedures.remove(proc.getProcId());
|
procedures.remove(proc.getProcId());
|
||||||
|
} else {
|
||||||
|
final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
|
||||||
|
if (childProcIds != null) {
|
||||||
|
store.delete(proc, childProcIds);
|
||||||
} else {
|
} else {
|
||||||
store.update(proc);
|
store.update(proc);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
store.update(proc);
|
store.update(proc);
|
||||||
}
|
}
|
||||||
|
@ -1102,6 +1112,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
|
assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
|
||||||
subproc.setParentProcId(procedure.getProcId());
|
subproc.setParentProcId(procedure.getProcId());
|
||||||
subproc.setProcId(nextProcId());
|
subproc.setProcId(nextProcId());
|
||||||
|
procStack.addSubProcedure(subproc);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!procedure.isFailed()) {
|
if (!procedure.isFailed()) {
|
||||||
|
@ -1138,17 +1149,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit the transaction
|
// Commit the transaction
|
||||||
if (subprocs != null && !procedure.isFailed()) {
|
updateStoreOnExec(procStack, procedure, subprocs);
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
|
|
||||||
}
|
|
||||||
store.insert(procedure, subprocs);
|
|
||||||
} else {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Store update " + procedure);
|
|
||||||
}
|
|
||||||
store.update(procedure);
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the store is not running we are aborting
|
// if the store is not running we are aborting
|
||||||
if (!store.isRunning()) {
|
if (!store.isRunning()) {
|
||||||
|
@ -1198,6 +1199,34 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateStoreOnExec(final RootProcedureState procStack,
|
||||||
|
final Procedure procedure, final Procedure[] subprocs) {
|
||||||
|
if (subprocs != null && !procedure.isFailed()) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
|
||||||
|
}
|
||||||
|
store.insert(procedure, subprocs);
|
||||||
|
} else {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Store update " + procedure);
|
||||||
|
}
|
||||||
|
if (procedure.isFinished() && !procedure.hasParent()) {
|
||||||
|
// remove child procedures
|
||||||
|
final long[] childProcIds = procStack.getSubprocedureIds();
|
||||||
|
if (childProcIds != null) {
|
||||||
|
store.delete(procedure, childProcIds);
|
||||||
|
for (int i = 0; i < childProcIds.length; ++i) {
|
||||||
|
procedures.remove(childProcIds[i]);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
store.update(procedure);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
store.update(procedure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
|
private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
|
LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -49,7 +51,8 @@ class RootProcedureState {
|
||||||
ROLLINGBACK, // The Procedure failed and the execution was rolledback
|
ROLLINGBACK, // The Procedure failed and the execution was rolledback
|
||||||
}
|
}
|
||||||
|
|
||||||
private ArrayList<Procedure> subprocedures = null;
|
private Set<Procedure> subprocs = null;
|
||||||
|
private ArrayList<Procedure> subprocStack = null;
|
||||||
private State state = State.RUNNING;
|
private State state = State.RUNNING;
|
||||||
private int running = 0;
|
private int running = 0;
|
||||||
|
|
||||||
|
@ -87,13 +90,23 @@ class RootProcedureState {
|
||||||
state = State.FAILED;
|
state = State.FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized List<Procedure> getSubprocedures() {
|
protected synchronized long[] getSubprocedureIds() {
|
||||||
return subprocedures;
|
if (subprocs == null) return null;
|
||||||
|
int index = 0;
|
||||||
|
final long[] subIds = new long[subprocs.size()];
|
||||||
|
for (Procedure proc: subprocs) {
|
||||||
|
subIds[index++] = proc.getProcId();
|
||||||
|
}
|
||||||
|
return subIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized List<Procedure> getSubproceduresStack() {
|
||||||
|
return subprocStack;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized RemoteProcedureException getException() {
|
protected synchronized RemoteProcedureException getException() {
|
||||||
if (subprocedures != null) {
|
if (subprocStack != null) {
|
||||||
for (Procedure proc: subprocedures) {
|
for (Procedure proc: subprocStack) {
|
||||||
if (proc.hasException()) {
|
if (proc.hasException()) {
|
||||||
return proc.getException();
|
return proc.getException();
|
||||||
}
|
}
|
||||||
|
@ -133,11 +146,19 @@ class RootProcedureState {
|
||||||
if (proc.isFailed()) {
|
if (proc.isFailed()) {
|
||||||
state = State.FAILED;
|
state = State.FAILED;
|
||||||
}
|
}
|
||||||
if (subprocedures == null) {
|
if (subprocStack == null) {
|
||||||
subprocedures = new ArrayList<Procedure>();
|
subprocStack = new ArrayList<Procedure>();
|
||||||
}
|
}
|
||||||
proc.addStackIndex(subprocedures.size());
|
proc.addStackIndex(subprocStack.size());
|
||||||
subprocedures.add(proc);
|
subprocStack.add(proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized void addSubProcedure(final Procedure proc) {
|
||||||
|
if (!proc.hasParent()) return;
|
||||||
|
if (subprocs == null) {
|
||||||
|
subprocs = new HashSet<Procedure>();
|
||||||
|
}
|
||||||
|
subprocs.add(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -148,18 +169,19 @@ class RootProcedureState {
|
||||||
* on load we recreate the full stack by aggregating each procedure stack-positions.
|
* on load we recreate the full stack by aggregating each procedure stack-positions.
|
||||||
*/
|
*/
|
||||||
protected synchronized void loadStack(final Procedure proc) {
|
protected synchronized void loadStack(final Procedure proc) {
|
||||||
|
addSubProcedure(proc);
|
||||||
int[] stackIndexes = proc.getStackIndexes();
|
int[] stackIndexes = proc.getStackIndexes();
|
||||||
if (stackIndexes != null) {
|
if (stackIndexes != null) {
|
||||||
if (subprocedures == null) {
|
if (subprocStack == null) {
|
||||||
subprocedures = new ArrayList<Procedure>();
|
subprocStack = new ArrayList<Procedure>();
|
||||||
}
|
}
|
||||||
int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocedures.size();
|
int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size();
|
||||||
if (diff > 0) {
|
if (diff > 0) {
|
||||||
subprocedures.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
|
subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
|
||||||
while (diff-- > 0) subprocedures.add(null);
|
while (diff-- > 0) subprocStack.add(null);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < stackIndexes.length; ++i) {
|
for (int i = 0; i < stackIndexes.length; ++i) {
|
||||||
subprocedures.set(stackIndexes[i], proc);
|
subprocStack.set(stackIndexes[i], proc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (proc.getState() == ProcedureState.ROLLEDBACK) {
|
if (proc.getState() == ProcedureState.ROLLEDBACK) {
|
||||||
|
@ -173,8 +195,8 @@ class RootProcedureState {
|
||||||
* Called on store load by the ProcedureExecutor to validate the procedure stack.
|
* Called on store load by the ProcedureExecutor to validate the procedure stack.
|
||||||
*/
|
*/
|
||||||
protected synchronized boolean isValid() {
|
protected synchronized boolean isValid() {
|
||||||
if (subprocedures != null) {
|
if (subprocStack != null) {
|
||||||
for (Procedure proc: subprocedures) {
|
for (Procedure proc: subprocStack) {
|
||||||
if (proc == null) {
|
if (proc == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,4 +70,9 @@ public class NoopProcedureStore extends ProcedureStoreBase {
|
||||||
public void delete(long procId) {
|
public void delete(long procId) {
|
||||||
// no-op
|
// no-op
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(Procedure proc, long[] subprocs) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.procedure2.store;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -188,4 +188,12 @@ public interface ProcedureStore {
|
||||||
* @param procId the ID of the procedure to remove.
|
* @param procId the ID of the procedure to remove.
|
||||||
*/
|
*/
|
||||||
void delete(long procId);
|
void delete(long procId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The parent procedure completed.
|
||||||
|
* Update the state and mark all the child deleted.
|
||||||
|
* @param parentProc the parent procedure to serialize and write to the store.
|
||||||
|
* @param subProcIds the IDs of the sub-procedure to remove.
|
||||||
|
*/
|
||||||
|
void delete(Procedure parentProc, long[] subProcIds);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2.store;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
@ -394,6 +395,14 @@ public class ProcedureStoreTracker {
|
||||||
trackProcIds(procId);
|
trackProcIds(procId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void delete(long[] procIds) {
|
||||||
|
// TODO: optimize
|
||||||
|
Arrays.sort(procIds);
|
||||||
|
for (int i = 0; i < procIds.length; ++i) {
|
||||||
|
delete(procIds[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void trackProcIds(long procId) {
|
private void trackProcIds(long procId) {
|
||||||
minUpdatedProcId = Math.min(minUpdatedProcId, procId);
|
minUpdatedProcId = Math.min(minUpdatedProcId, procId);
|
||||||
maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
|
maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -36,8 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEn
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
|
||||||
|
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class that contains the WAL serialization utils.
|
* Helper class that contains the WAL serialization utils.
|
||||||
*/
|
*/
|
||||||
|
@ -231,4 +231,18 @@ public final class ProcedureWALFormat {
|
||||||
builder.setProcId(procId);
|
builder.setProcId(procId);
|
||||||
builder.build().writeDelimitedTo(slot);
|
builder.build().writeDelimitedTo(slot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs)
|
||||||
|
throws IOException {
|
||||||
|
ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
|
||||||
|
builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
|
||||||
|
builder.setProcId(proc.getProcId());
|
||||||
|
if (subprocs != null) {
|
||||||
|
builder.addProcedure(Procedure.convert(proc));
|
||||||
|
for (int i = 0; i < subprocs.length; ++i) {
|
||||||
|
builder.addChildId(subprocs[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder.build().writeDelimitedTo(slot);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -33,8 +35,6 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
||||||
|
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class that loads the procedures stored in a WAL
|
* Helper class that loads the procedures stored in a WAL
|
||||||
*/
|
*/
|
||||||
|
@ -209,15 +209,34 @@ public class ProcedureWALFormatReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
|
private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
|
||||||
assert entry.getProcedureCount() == 0 : "Expected no procedures";
|
|
||||||
assert entry.hasProcId() : "expected ProcID";
|
assert entry.hasProcId() : "expected ProcID";
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("read delete entry " + entry.getProcId());
|
if (entry.getChildIdCount() > 0) {
|
||||||
|
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
|
||||||
|
|
||||||
|
// update the parent procedure
|
||||||
|
loadProcedure(entry, entry.getProcedure(0));
|
||||||
|
|
||||||
|
// remove the child procedures of entry.getProcId()
|
||||||
|
for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) {
|
||||||
|
deleteEntry(entry.getChildId(i));
|
||||||
}
|
}
|
||||||
maxProcId = Math.max(maxProcId, entry.getProcId());
|
} else {
|
||||||
localProcedureMap.remove(entry.getProcId());
|
assert entry.getProcedureCount() == 0 : "Expected no procedures";
|
||||||
assert !procedureMap.contains(entry.getProcId());
|
|
||||||
tracker.setDeleted(entry.getProcId(), true);
|
// delete the procedure
|
||||||
|
deleteEntry(entry.getProcId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteEntry(final long procId) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("delete entry " + procId);
|
||||||
|
}
|
||||||
|
maxProcId = Math.max(maxProcId, procId);
|
||||||
|
localProcedureMap.remove(procId);
|
||||||
|
assert !procedureMap.contains(procId);
|
||||||
|
tracker.setDeleted(procId, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isDeleted(final long procId) {
|
private boolean isDeleted(final long procId) {
|
||||||
|
@ -269,6 +288,8 @@ public class ProcedureWALFormatReader {
|
||||||
|
|
||||||
public boolean isCompleted() {
|
public boolean isCompleted() {
|
||||||
if (!hasParent()) {
|
if (!hasParent()) {
|
||||||
|
// we only consider 'root' procedures. because for the user 'completed'
|
||||||
|
// means when everything up to the 'root' is complete.
|
||||||
switch (proto.getState()) {
|
switch (proto.getState()) {
|
||||||
case ROLLEDBACK:
|
case ROLLEDBACK:
|
||||||
return true;
|
return true;
|
||||||
|
@ -294,7 +315,15 @@ public class ProcedureWALFormatReader {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
|
final StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("Entry(");
|
||||||
|
sb.append(getProcId());
|
||||||
|
sb.append(", parentId=");
|
||||||
|
sb.append(getParentId());
|
||||||
|
sb.append(", class=");
|
||||||
|
sb.append(proto.getClassName());
|
||||||
|
sb.append(")");
|
||||||
|
return sb.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -603,6 +632,22 @@ public class ProcedureWALFormatReader {
|
||||||
* There is a gap between A stackIds so something was executed in between.
|
* There is a gap between A stackIds so something was executed in between.
|
||||||
*/
|
*/
|
||||||
private boolean checkReadyToRun(Entry rootEntry) {
|
private boolean checkReadyToRun(Entry rootEntry) {
|
||||||
|
assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
|
||||||
|
|
||||||
|
if (rootEntry.isCompleted()) {
|
||||||
|
// if the root procedure is completed, sub-procedures should be gone
|
||||||
|
if (rootEntry.childHead != null) {
|
||||||
|
LOG.error("unexpected active children for root-procedure: " + rootEntry);
|
||||||
|
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
|
||||||
|
LOG.error("unexpected active children: " + p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
|
||||||
|
rootEntry.ready = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
int stackIdSum = 0;
|
int stackIdSum = 0;
|
||||||
int maxStackId = 0;
|
int maxStackId = 0;
|
||||||
for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
|
for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -57,8 +59,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHe
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WAL implementation of the ProcedureStore.
|
* WAL implementation of the ProcedureStore.
|
||||||
*/
|
*/
|
||||||
|
@ -461,6 +461,29 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(final Procedure proc, final long[] subProcIds) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteSlot slot = acquireSlot();
|
||||||
|
try {
|
||||||
|
// Serialize the delete
|
||||||
|
ProcedureWALFormat.writeDelete(slot, proc, subProcIds);
|
||||||
|
|
||||||
|
// Push the transaction data and wait until it is persisted
|
||||||
|
pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// We are not able to serialize the procedure.
|
||||||
|
// this is a code error, and we are not able to go on.
|
||||||
|
LOG.fatal("Unable to serialize the procedure: " + proc, e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} finally {
|
||||||
|
releaseSlot(slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private ByteSlot acquireSlot() {
|
private ByteSlot acquireSlot() {
|
||||||
ByteSlot slot = slotsCache.poll();
|
ByteSlot slot = slotsCache.poll();
|
||||||
return slot != null ? slot : new ByteSlot();
|
return slot != null ? slot : new ByteSlot();
|
||||||
|
@ -544,7 +567,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
storeTracker.update(procId);
|
storeTracker.update(procId);
|
||||||
break;
|
break;
|
||||||
case DELETE:
|
case DELETE:
|
||||||
|
if (subProcIds != null && subProcIds.length > 0) {
|
||||||
|
storeTracker.delete(subProcIds);
|
||||||
|
} else {
|
||||||
storeTracker.delete(procId);
|
storeTracker.delete(procId);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("invalid push type " + type);
|
throw new RuntimeException("invalid push type " + type);
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator
|
||||||
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
|
import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
@ -232,6 +233,10 @@ public class ProcedureTestingUtility {
|
||||||
addStackIndex(index);
|
addStackIndex(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setFinishedState() {
|
||||||
|
setState(ProcedureState.FINISHED);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure[] execute(Void env) { return null; }
|
protected Procedure[] execute(Void env) { return null; }
|
||||||
|
|
||||||
|
@ -250,7 +255,8 @@ public class ProcedureTestingUtility {
|
||||||
|
|
||||||
public static class LoadCounter implements ProcedureStore.ProcedureLoader {
|
public static class LoadCounter implements ProcedureStore.ProcedureLoader {
|
||||||
private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
|
private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
|
||||||
private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
|
private final ArrayList<ProcedureInfo> completed = new ArrayList<ProcedureInfo>();
|
||||||
|
private final ArrayList<Procedure> runnable = new ArrayList<Procedure>();
|
||||||
|
|
||||||
private Set<Long> procIds;
|
private Set<Long> procIds;
|
||||||
private long maxProcId = 0;
|
private long maxProcId = 0;
|
||||||
|
@ -269,7 +275,8 @@ public class ProcedureTestingUtility {
|
||||||
|
|
||||||
public void reset(final Set<Long> procIds) {
|
public void reset(final Set<Long> procIds) {
|
||||||
corrupted.clear();
|
corrupted.clear();
|
||||||
loaded.clear();
|
completed.clear();
|
||||||
|
runnable.clear();
|
||||||
this.procIds = procIds;
|
this.procIds = procIds;
|
||||||
this.maxProcId = 0;
|
this.maxProcId = 0;
|
||||||
}
|
}
|
||||||
|
@ -278,12 +285,24 @@ public class ProcedureTestingUtility {
|
||||||
return maxProcId;
|
return maxProcId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ArrayList<Procedure> getLoaded() {
|
public ArrayList<Procedure> getRunnables() {
|
||||||
return loaded;
|
return runnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getRunnableCount() {
|
||||||
|
return runnable.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayList<ProcedureInfo> getCompleted() {
|
||||||
|
return completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCompletedCount() {
|
||||||
|
return completed.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getLoadedCount() {
|
public int getLoadedCount() {
|
||||||
return loaded.size();
|
return runnable.size() + completed.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ArrayList<Procedure> getCorrupted() {
|
public ArrayList<Procedure> getCorrupted() {
|
||||||
|
@ -302,13 +321,21 @@ public class ProcedureTestingUtility {
|
||||||
@Override
|
@Override
|
||||||
public void load(ProcedureIterator procIter) throws IOException {
|
public void load(ProcedureIterator procIter) throws IOException {
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
|
long procId;
|
||||||
|
if (procIter.isNextCompleted()) {
|
||||||
|
ProcedureInfo proc = procIter.nextAsProcedureInfo();
|
||||||
|
procId = proc.getProcId();
|
||||||
|
LOG.debug("loading completed procId=" + procId + ": " + proc);
|
||||||
|
completed.add(proc);
|
||||||
|
} else {
|
||||||
Procedure proc = procIter.nextAsProcedure();
|
Procedure proc = procIter.nextAsProcedure();
|
||||||
LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
|
procId = proc.getProcId();
|
||||||
if (procIds != null) {
|
LOG.debug("loading runnable procId=" + procId + ": " + proc);
|
||||||
assertTrue("procId=" + proc.getProcId() + " unexpected",
|
runnable.add(proc);
|
||||||
procIds.contains(proc.getProcId()));
|
}
|
||||||
|
if (procIds != null) {
|
||||||
|
assertTrue("procId=" + procId + " unexpected", procIds.contains(procId));
|
||||||
}
|
}
|
||||||
loaded.add(proc);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,196 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@Category({MasterTests.class, SmallTests.class})
|
||||||
|
public class TestChildProcedures {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestChildProcedures.class);
|
||||||
|
|
||||||
|
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
|
||||||
|
|
||||||
|
private static TestProcEnv procEnv;
|
||||||
|
private static ProcedureExecutor<TestProcEnv> procExecutor;
|
||||||
|
private static ProcedureStore procStore;
|
||||||
|
private static int procSleepInterval;
|
||||||
|
|
||||||
|
private HBaseCommonTestingUtility htu;
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path testDir;
|
||||||
|
private Path logDir;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
htu = new HBaseCommonTestingUtility();
|
||||||
|
testDir = htu.getDataTestDir();
|
||||||
|
fs = testDir.getFileSystem(htu.getConfiguration());
|
||||||
|
assertTrue(testDir.depth() > 1);
|
||||||
|
|
||||||
|
logDir = new Path(testDir, "proc-logs");
|
||||||
|
procEnv = new TestProcEnv();
|
||||||
|
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
|
||||||
|
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
|
||||||
|
procExecutor.testing = new ProcedureExecutor.Testing();
|
||||||
|
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||||
|
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
|
||||||
|
procSleepInterval = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
procExecutor.stop();
|
||||||
|
procStore.stop(false);
|
||||||
|
fs.delete(logDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChildLoad() throws Exception {
|
||||||
|
procEnv.toggleKillBeforeStoreUpdate = false;
|
||||||
|
|
||||||
|
TestRootProcedure proc = new TestRootProcedure();
|
||||||
|
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
|
||||||
|
ProcedureTestingUtility.restart(procExecutor);
|
||||||
|
assertTrue("expected completed proc", procExecutor.isFinished(procId));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChildLoadWithSteppedRestart() throws Exception {
|
||||||
|
procEnv.toggleKillBeforeStoreUpdate = true;
|
||||||
|
|
||||||
|
TestRootProcedure proc = new TestRootProcedure();
|
||||||
|
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
|
||||||
|
int restartCount = 0;
|
||||||
|
while (!procExecutor.isFinished(procId)) {
|
||||||
|
ProcedureTestingUtility.restart(procExecutor);
|
||||||
|
restartCount++;
|
||||||
|
}
|
||||||
|
assertEquals(7, restartCount);
|
||||||
|
assertTrue("expected completed proc", procExecutor.isFinished(procId));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChildRollbackLoad() throws Exception {
|
||||||
|
procEnv.toggleKillBeforeStoreUpdate = false;
|
||||||
|
procEnv.triggerRollbackOnChild = true;
|
||||||
|
|
||||||
|
TestRootProcedure proc = new TestRootProcedure();
|
||||||
|
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
|
||||||
|
ProcedureTestingUtility.restart(procExecutor);
|
||||||
|
|
||||||
|
assertProcFailed(procId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChildRollbackLoadWithSteppedRestart() throws Exception {
|
||||||
|
procEnv.toggleKillBeforeStoreUpdate = true;
|
||||||
|
procEnv.triggerRollbackOnChild = true;
|
||||||
|
|
||||||
|
TestRootProcedure proc = new TestRootProcedure();
|
||||||
|
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
|
||||||
|
int restartCount = 0;
|
||||||
|
while (!procExecutor.isFinished(procId)) {
|
||||||
|
ProcedureTestingUtility.restart(procExecutor);
|
||||||
|
restartCount++;
|
||||||
|
}
|
||||||
|
assertEquals(6, restartCount);
|
||||||
|
assertProcFailed(procId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertProcFailed(long procId) {
|
||||||
|
assertTrue("expected completed proc", procExecutor.isFinished(procId));
|
||||||
|
ProcedureInfo result = procExecutor.getResult(procId);
|
||||||
|
assertEquals(true, result.isFailed());
|
||||||
|
LOG.info(result.getExceptionFullMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestRootProcedure extends SequentialProcedure<TestProcEnv> {
|
||||||
|
public TestRootProcedure() {}
|
||||||
|
|
||||||
|
public Procedure[] execute(TestProcEnv env) {
|
||||||
|
if (env.toggleKillBeforeStoreUpdate) {
|
||||||
|
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
|
||||||
|
}
|
||||||
|
return new Procedure[] { new TestChildProcedure(), new TestChildProcedure() };
|
||||||
|
}
|
||||||
|
|
||||||
|
public void rollback(TestProcEnv env) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean abort(TestProcEnv env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestChildProcedure extends SequentialProcedure<TestProcEnv> {
|
||||||
|
public TestChildProcedure() {}
|
||||||
|
|
||||||
|
public Procedure[] execute(TestProcEnv env) {
|
||||||
|
if (env.toggleKillBeforeStoreUpdate) {
|
||||||
|
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
|
||||||
|
}
|
||||||
|
if (env.triggerRollbackOnChild) {
|
||||||
|
setFailure("test", new Exception("test"));
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void rollback(TestProcEnv env) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean abort(TestProcEnv env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestProcEnv {
|
||||||
|
public boolean toggleKillBeforeStoreUpdate = false;
|
||||||
|
public boolean triggerRollbackOnChild = false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -106,7 +106,9 @@ public class TestStressWALProcedureStore {
|
||||||
Random rand = new Random();
|
Random rand = new Random();
|
||||||
TestProcedure proc;
|
TestProcedure proc;
|
||||||
do {
|
do {
|
||||||
proc = new TestProcedure(procCounter.addAndGet(1));
|
// After HBASE- there may be gap in the procId sequence, trying to simulate that.
|
||||||
|
long procId = procCounter.addAndGet(1 + rand.nextInt(3));
|
||||||
|
proc = new TestProcedure(procId);
|
||||||
// Insert
|
// Insert
|
||||||
procStore.insert(proc, null);
|
procStore.insert(proc, null);
|
||||||
// Update
|
// Update
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
||||||
|
@ -506,10 +507,56 @@ public class TestWALProcedureStore {
|
||||||
procStore.recoverLease();
|
procStore.recoverLease();
|
||||||
procStore.load(loader);
|
procStore.load(loader);
|
||||||
assertEquals(procs.length, loader.getMaxProcId());
|
assertEquals(procs.length, loader.getMaxProcId());
|
||||||
assertEquals(procs.length - 1, loader.getLoadedCount());
|
assertEquals(procs.length - 1, loader.getRunnableCount());
|
||||||
|
assertEquals(0, loader.getCompletedCount());
|
||||||
assertEquals(0, loader.getCorruptedCount());
|
assertEquals(0, loader.getCorruptedCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testLoadChildren() throws Exception {
|
||||||
|
TestProcedure a = new TestProcedure(1, 0);
|
||||||
|
TestProcedure b = new TestProcedure(2, 1);
|
||||||
|
TestProcedure c = new TestProcedure(3, 1);
|
||||||
|
|
||||||
|
// INIT
|
||||||
|
procStore.insert(a, null);
|
||||||
|
|
||||||
|
// Run A first step
|
||||||
|
a.addStackId(0);
|
||||||
|
procStore.update(a);
|
||||||
|
|
||||||
|
// Run A second step
|
||||||
|
a.addStackId(1);
|
||||||
|
procStore.insert(a, new Procedure[] { b, c });
|
||||||
|
|
||||||
|
// Run B first step
|
||||||
|
b.addStackId(2);
|
||||||
|
procStore.update(b);
|
||||||
|
|
||||||
|
// Run C first and last step
|
||||||
|
c.addStackId(3);
|
||||||
|
procStore.update(c);
|
||||||
|
|
||||||
|
// Run B second setp
|
||||||
|
b.addStackId(4);
|
||||||
|
procStore.update(b);
|
||||||
|
|
||||||
|
// back to A
|
||||||
|
a.addStackId(5);
|
||||||
|
a.setFinishedState();
|
||||||
|
procStore.delete(a, new long[] { b.getProcId(), c.getProcId() });
|
||||||
|
restartAndAssert(3, 0, 1, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void restartAndAssert(long maxProcId, long runnableCount,
|
||||||
|
int completedCount, int corruptedCount) throws Exception {
|
||||||
|
final LoadCounter loader = new LoadCounter();
|
||||||
|
storeRestart(loader);
|
||||||
|
assertEquals(maxProcId, loader.getMaxProcId());
|
||||||
|
assertEquals(runnableCount, loader.getRunnableCount());
|
||||||
|
assertEquals(completedCount, loader.getCompletedCount());
|
||||||
|
assertEquals(corruptedCount, loader.getCorruptedCount());
|
||||||
|
}
|
||||||
|
|
||||||
private void corruptLog(final FileStatus logFile, final long dropBytes)
|
private void corruptLog(final FileStatus logFile, final long dropBytes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assertTrue(logFile.getLen() > dropBytes);
|
assertTrue(logFile.getLen() > dropBytes);
|
||||||
|
|
|
@ -6295,6 +6295,20 @@ public final class ProcedureProtos {
|
||||||
* <code>optional uint64 proc_id = 3;</code>
|
* <code>optional uint64 proc_id = 3;</code>
|
||||||
*/
|
*/
|
||||||
long getProcId();
|
long getProcId();
|
||||||
|
|
||||||
|
// repeated uint64 child_id = 4;
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
java.util.List<java.lang.Long> getChildIdList();
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
int getChildIdCount();
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
long getChildId(int index);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Protobuf type {@code hbase.pb.ProcedureWALEntry}
|
* Protobuf type {@code hbase.pb.ProcedureWALEntry}
|
||||||
|
@ -6371,6 +6385,27 @@ public final class ProcedureProtos {
|
||||||
procId_ = input.readUInt64();
|
procId_ = input.readUInt64();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 32: {
|
||||||
|
if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
childId_ = new java.util.ArrayList<java.lang.Long>();
|
||||||
|
mutable_bitField0_ |= 0x00000008;
|
||||||
|
}
|
||||||
|
childId_.add(input.readUInt64());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 34: {
|
||||||
|
int length = input.readRawVarint32();
|
||||||
|
int limit = input.pushLimit(length);
|
||||||
|
if (!((mutable_bitField0_ & 0x00000008) == 0x00000008) && input.getBytesUntilLimit() > 0) {
|
||||||
|
childId_ = new java.util.ArrayList<java.lang.Long>();
|
||||||
|
mutable_bitField0_ |= 0x00000008;
|
||||||
|
}
|
||||||
|
while (input.getBytesUntilLimit() > 0) {
|
||||||
|
childId_.add(input.readUInt64());
|
||||||
|
}
|
||||||
|
input.popLimit(limit);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||||
|
@ -6382,6 +6417,9 @@ public final class ProcedureProtos {
|
||||||
if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
|
if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
|
||||||
procedure_ = java.util.Collections.unmodifiableList(procedure_);
|
procedure_ = java.util.Collections.unmodifiableList(procedure_);
|
||||||
}
|
}
|
||||||
|
if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
childId_ = java.util.Collections.unmodifiableList(childId_);
|
||||||
|
}
|
||||||
this.unknownFields = unknownFields.build();
|
this.unknownFields = unknownFields.build();
|
||||||
makeExtensionsImmutable();
|
makeExtensionsImmutable();
|
||||||
}
|
}
|
||||||
|
@ -6600,10 +6638,34 @@ public final class ProcedureProtos {
|
||||||
return procId_;
|
return procId_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// repeated uint64 child_id = 4;
|
||||||
|
public static final int CHILD_ID_FIELD_NUMBER = 4;
|
||||||
|
private java.util.List<java.lang.Long> childId_;
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public java.util.List<java.lang.Long>
|
||||||
|
getChildIdList() {
|
||||||
|
return childId_;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public int getChildIdCount() {
|
||||||
|
return childId_.size();
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public long getChildId(int index) {
|
||||||
|
return childId_.get(index);
|
||||||
|
}
|
||||||
|
|
||||||
private void initFields() {
|
private void initFields() {
|
||||||
type_ = org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry.Type.PROCEDURE_WAL_EOF;
|
type_ = org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry.Type.PROCEDURE_WAL_EOF;
|
||||||
procedure_ = java.util.Collections.emptyList();
|
procedure_ = java.util.Collections.emptyList();
|
||||||
procId_ = 0L;
|
procId_ = 0L;
|
||||||
|
childId_ = java.util.Collections.emptyList();
|
||||||
}
|
}
|
||||||
private byte memoizedIsInitialized = -1;
|
private byte memoizedIsInitialized = -1;
|
||||||
public final boolean isInitialized() {
|
public final boolean isInitialized() {
|
||||||
|
@ -6636,6 +6698,9 @@ public final class ProcedureProtos {
|
||||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||||
output.writeUInt64(3, procId_);
|
output.writeUInt64(3, procId_);
|
||||||
}
|
}
|
||||||
|
for (int i = 0; i < childId_.size(); i++) {
|
||||||
|
output.writeUInt64(4, childId_.get(i));
|
||||||
|
}
|
||||||
getUnknownFields().writeTo(output);
|
getUnknownFields().writeTo(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6657,6 +6722,15 @@ public final class ProcedureProtos {
|
||||||
size += com.google.protobuf.CodedOutputStream
|
size += com.google.protobuf.CodedOutputStream
|
||||||
.computeUInt64Size(3, procId_);
|
.computeUInt64Size(3, procId_);
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
int dataSize = 0;
|
||||||
|
for (int i = 0; i < childId_.size(); i++) {
|
||||||
|
dataSize += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeUInt64SizeNoTag(childId_.get(i));
|
||||||
|
}
|
||||||
|
size += dataSize;
|
||||||
|
size += 1 * getChildIdList().size();
|
||||||
|
}
|
||||||
size += getUnknownFields().getSerializedSize();
|
size += getUnknownFields().getSerializedSize();
|
||||||
memoizedSerializedSize = size;
|
memoizedSerializedSize = size;
|
||||||
return size;
|
return size;
|
||||||
|
@ -6692,6 +6766,8 @@ public final class ProcedureProtos {
|
||||||
result = result && (getProcId()
|
result = result && (getProcId()
|
||||||
== other.getProcId());
|
== other.getProcId());
|
||||||
}
|
}
|
||||||
|
result = result && getChildIdList()
|
||||||
|
.equals(other.getChildIdList());
|
||||||
result = result &&
|
result = result &&
|
||||||
getUnknownFields().equals(other.getUnknownFields());
|
getUnknownFields().equals(other.getUnknownFields());
|
||||||
return result;
|
return result;
|
||||||
|
@ -6717,6 +6793,10 @@ public final class ProcedureProtos {
|
||||||
hash = (37 * hash) + PROC_ID_FIELD_NUMBER;
|
hash = (37 * hash) + PROC_ID_FIELD_NUMBER;
|
||||||
hash = (53 * hash) + hashLong(getProcId());
|
hash = (53 * hash) + hashLong(getProcId());
|
||||||
}
|
}
|
||||||
|
if (getChildIdCount() > 0) {
|
||||||
|
hash = (37 * hash) + CHILD_ID_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + getChildIdList().hashCode();
|
||||||
|
}
|
||||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||||
memoizedHashCode = hash;
|
memoizedHashCode = hash;
|
||||||
return hash;
|
return hash;
|
||||||
|
@ -6837,6 +6917,8 @@ public final class ProcedureProtos {
|
||||||
}
|
}
|
||||||
procId_ = 0L;
|
procId_ = 0L;
|
||||||
bitField0_ = (bitField0_ & ~0x00000004);
|
bitField0_ = (bitField0_ & ~0x00000004);
|
||||||
|
childId_ = java.util.Collections.emptyList();
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6882,6 +6964,11 @@ public final class ProcedureProtos {
|
||||||
to_bitField0_ |= 0x00000002;
|
to_bitField0_ |= 0x00000002;
|
||||||
}
|
}
|
||||||
result.procId_ = procId_;
|
result.procId_ = procId_;
|
||||||
|
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
childId_ = java.util.Collections.unmodifiableList(childId_);
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
|
}
|
||||||
|
result.childId_ = childId_;
|
||||||
result.bitField0_ = to_bitField0_;
|
result.bitField0_ = to_bitField0_;
|
||||||
onBuilt();
|
onBuilt();
|
||||||
return result;
|
return result;
|
||||||
|
@ -6930,6 +7017,16 @@ public final class ProcedureProtos {
|
||||||
if (other.hasProcId()) {
|
if (other.hasProcId()) {
|
||||||
setProcId(other.getProcId());
|
setProcId(other.getProcId());
|
||||||
}
|
}
|
||||||
|
if (!other.childId_.isEmpty()) {
|
||||||
|
if (childId_.isEmpty()) {
|
||||||
|
childId_ = other.childId_;
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
|
} else {
|
||||||
|
ensureChildIdIsMutable();
|
||||||
|
childId_.addAll(other.childId_);
|
||||||
|
}
|
||||||
|
onChanged();
|
||||||
|
}
|
||||||
this.mergeUnknownFields(other.getUnknownFields());
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -7276,6 +7373,72 @@ public final class ProcedureProtos {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// repeated uint64 child_id = 4;
|
||||||
|
private java.util.List<java.lang.Long> childId_ = java.util.Collections.emptyList();
|
||||||
|
private void ensureChildIdIsMutable() {
|
||||||
|
if (!((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
childId_ = new java.util.ArrayList<java.lang.Long>(childId_);
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public java.util.List<java.lang.Long>
|
||||||
|
getChildIdList() {
|
||||||
|
return java.util.Collections.unmodifiableList(childId_);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public int getChildIdCount() {
|
||||||
|
return childId_.size();
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public long getChildId(int index) {
|
||||||
|
return childId_.get(index);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public Builder setChildId(
|
||||||
|
int index, long value) {
|
||||||
|
ensureChildIdIsMutable();
|
||||||
|
childId_.set(index, value);
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public Builder addChildId(long value) {
|
||||||
|
ensureChildIdIsMutable();
|
||||||
|
childId_.add(value);
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public Builder addAllChildId(
|
||||||
|
java.lang.Iterable<? extends java.lang.Long> values) {
|
||||||
|
ensureChildIdIsMutable();
|
||||||
|
super.addAll(values, childId_);
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>repeated uint64 child_id = 4;</code>
|
||||||
|
*/
|
||||||
|
public Builder clearChildId() {
|
||||||
|
childId_ = java.util.Collections.emptyList();
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
// @@protoc_insertion_point(builder_scope:hbase.pb.ProcedureWALEntry)
|
// @@protoc_insertion_point(builder_scope:hbase.pb.ProcedureWALEntry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7355,19 +7518,19 @@ public final class ProcedureProtos {
|
||||||
"eTracker\0229\n\004node\030\001 \003(\0132+.hbase.pb.Proced" +
|
"eTracker\0229\n\004node\030\001 \003(\0132+.hbase.pb.Proced" +
|
||||||
"ureStoreTracker.TrackerNode\032A\n\013TrackerNo" +
|
"ureStoreTracker.TrackerNode\032A\n\013TrackerNo" +
|
||||||
"de\022\020\n\010start_id\030\001 \002(\004\022\017\n\007updated\030\002 \003(\004\022\017\n" +
|
"de\022\020\n\010start_id\030\001 \002(\004\022\017\n\007updated\030\002 \003(\004\022\017\n" +
|
||||||
"\007deleted\030\003 \003(\004\"\235\002\n\021ProcedureWALEntry\022.\n\004",
|
"\007deleted\030\003 \003(\004\"\257\002\n\021ProcedureWALEntry\022.\n\004",
|
||||||
"type\030\001 \002(\0162 .hbase.pb.ProcedureWALEntry." +
|
"type\030\001 \002(\0162 .hbase.pb.ProcedureWALEntry." +
|
||||||
"Type\022&\n\tprocedure\030\002 \003(\0132\023.hbase.pb.Proce" +
|
"Type\022&\n\tprocedure\030\002 \003(\0132\023.hbase.pb.Proce" +
|
||||||
"dure\022\017\n\007proc_id\030\003 \001(\004\"\236\001\n\004Type\022\025\n\021PROCED" +
|
"dure\022\017\n\007proc_id\030\003 \001(\004\022\020\n\010child_id\030\004 \003(\004\"" +
|
||||||
"URE_WAL_EOF\020\001\022\026\n\022PROCEDURE_WAL_INIT\020\002\022\030\n" +
|
"\236\001\n\004Type\022\025\n\021PROCEDURE_WAL_EOF\020\001\022\026\n\022PROCE" +
|
||||||
"\024PROCEDURE_WAL_INSERT\020\003\022\030\n\024PROCEDURE_WAL" +
|
"DURE_WAL_INIT\020\002\022\030\n\024PROCEDURE_WAL_INSERT\020" +
|
||||||
"_UPDATE\020\004\022\030\n\024PROCEDURE_WAL_DELETE\020\005\022\031\n\025P" +
|
"\003\022\030\n\024PROCEDURE_WAL_UPDATE\020\004\022\030\n\024PROCEDURE" +
|
||||||
"ROCEDURE_WAL_COMPACT\020\006*p\n\016ProcedureState" +
|
"_WAL_DELETE\020\005\022\031\n\025PROCEDURE_WAL_COMPACT\020\006" +
|
||||||
"\022\020\n\014INITIALIZING\020\001\022\014\n\010RUNNABLE\020\002\022\013\n\007WAIT" +
|
"*p\n\016ProcedureState\022\020\n\014INITIALIZING\020\001\022\014\n\010" +
|
||||||
"ING\020\003\022\023\n\017WAITING_TIMEOUT\020\004\022\016\n\nROLLEDBACK" +
|
"RUNNABLE\020\002\022\013\n\007WAITING\020\003\022\023\n\017WAITING_TIMEO" +
|
||||||
"\020\005\022\014\n\010FINISHED\020\006BE\n*org.apache.hadoop.hb",
|
"UT\020\004\022\016\n\nROLLEDBACK\020\005\022\014\n\010FINISHED\020\006BE\n*or",
|
||||||
"ase.protobuf.generatedB\017ProcedureProtosH" +
|
"g.apache.hadoop.hbase.protobuf.generated" +
|
||||||
"\001\210\001\001\240\001\001"
|
"B\017ProcedureProtosH\001\210\001\001\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -7421,7 +7584,7 @@ public final class ProcedureProtos {
|
||||||
internal_static_hbase_pb_ProcedureWALEntry_fieldAccessorTable = new
|
internal_static_hbase_pb_ProcedureWALEntry_fieldAccessorTable = new
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
internal_static_hbase_pb_ProcedureWALEntry_descriptor,
|
internal_static_hbase_pb_ProcedureWALEntry_descriptor,
|
||||||
new java.lang.String[] { "Type", "Procedure", "ProcId", });
|
new java.lang.String[] { "Type", "Procedure", "ProcId", "ChildId", });
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -116,4 +116,5 @@ message ProcedureWALEntry {
|
||||||
required Type type = 1;
|
required Type type = 1;
|
||||||
repeated Procedure procedure = 2;
|
repeated Procedure procedure = 2;
|
||||||
optional uint64 proc_id = 3;
|
optional uint64 proc_id = 3;
|
||||||
|
repeated uint64 child_id = 4;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue