HBASE-21336 Simplify the implementation of WALProcedureMap
This commit is contained in:
parent
4bf3c5a702
commit
7adf590106
|
@ -505,8 +505,10 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
|
private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// 1. Build the rollback stack
|
// 1. Build the rollback stack
|
||||||
int runnablesCount = 0;
|
int runnableCount = 0;
|
||||||
int failedCount = 0;
|
int failedCount = 0;
|
||||||
|
int waitingCount = 0;
|
||||||
|
int waitingTimeoutCount = 0;
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
boolean finished = procIter.isNextFinished();
|
boolean finished = procIter.isNextFinished();
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -526,11 +528,21 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
// add the procedure to the map
|
// add the procedure to the map
|
||||||
proc.beforeReplay(getEnvironment());
|
proc.beforeReplay(getEnvironment());
|
||||||
procedures.put(proc.getProcId(), proc);
|
procedures.put(proc.getProcId(), proc);
|
||||||
|
switch (proc.getState()) {
|
||||||
if (proc.getState() == ProcedureState.RUNNABLE) {
|
case RUNNABLE:
|
||||||
runnablesCount++;
|
runnableCount++;
|
||||||
} else if (proc.getState() == ProcedureState.FAILED) {
|
break;
|
||||||
|
case FAILED:
|
||||||
failedCount++;
|
failedCount++;
|
||||||
|
break;
|
||||||
|
case WAITING:
|
||||||
|
waitingCount++;
|
||||||
|
break;
|
||||||
|
case WAITING_TIMEOUT:
|
||||||
|
waitingTimeoutCount++;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,9 +563,10 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
// have been polled out already, so when loading we can not add the procedure to scheduler first
|
// have been polled out already, so when loading we can not add the procedure to scheduler first
|
||||||
// and then call acquireLock, since the procedure is still in the queue, and since we will
|
// and then call acquireLock, since the procedure is still in the queue, and since we will
|
||||||
// remove the queue from runQueue, then no one can poll it out, then there is a dead lock
|
// remove the queue from runQueue, then no one can poll it out, then there is a dead lock
|
||||||
List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnablesCount);
|
List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
|
||||||
List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
|
List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
|
||||||
Set<Procedure<TEnvironment>> waitingSet = null;
|
List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
|
||||||
|
List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
|
||||||
procIter.reset();
|
procIter.reset();
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
if (procIter.isNextFinished()) {
|
if (procIter.isNextFinished()) {
|
||||||
|
@ -591,26 +604,10 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
runnableList.add(proc);
|
runnableList.add(proc);
|
||||||
break;
|
break;
|
||||||
case WAITING:
|
case WAITING:
|
||||||
if (!proc.hasChildren()) {
|
waitingList.add(proc);
|
||||||
// Normally, WAITING procedures should be waken by its children.
|
|
||||||
// But, there is a case that, all the children are successful and before
|
|
||||||
// they can wake up their parent procedure, the master was killed.
|
|
||||||
// So, during recovering the procedures from ProcedureWal, its children
|
|
||||||
// are not loaded because of their SUCCESS state.
|
|
||||||
// So we need to continue to run this WAITING procedure. But before
|
|
||||||
// executing, we need to set its state to RUNNABLE, otherwise, a exception
|
|
||||||
// will throw:
|
|
||||||
// Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
|
|
||||||
// "NOT RUNNABLE! " + procedure.toString());
|
|
||||||
proc.setState(ProcedureState.RUNNABLE);
|
|
||||||
runnableList.add(proc);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case WAITING_TIMEOUT:
|
case WAITING_TIMEOUT:
|
||||||
if (waitingSet == null) {
|
waitingTimeoutList.add(proc);
|
||||||
waitingSet = new HashSet<>();
|
|
||||||
}
|
|
||||||
waitingSet.add(proc);
|
|
||||||
break;
|
break;
|
||||||
case FAILED:
|
case FAILED:
|
||||||
failedList.add(proc);
|
failedList.add(proc);
|
||||||
|
@ -625,39 +622,32 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Validate the stacks
|
// 4. Check the waiting procedures to see if some of them can be added to runnable.
|
||||||
int corruptedCount = 0;
|
waitingList.forEach(proc -> {
|
||||||
Iterator<Map.Entry<Long, RootProcedureState<TEnvironment>>> itStack =
|
if (!proc.hasChildren()) {
|
||||||
rollbackStack.entrySet().iterator();
|
// Normally, WAITING procedures should be waken by its children.
|
||||||
while (itStack.hasNext()) {
|
// But, there is a case that, all the children are successful and before
|
||||||
Map.Entry<Long, RootProcedureState<TEnvironment>> entry = itStack.next();
|
// they can wake up their parent procedure, the master was killed.
|
||||||
RootProcedureState<TEnvironment> procStack = entry.getValue();
|
// So, during recovering the procedures from ProcedureWal, its children
|
||||||
if (procStack.isValid()) continue;
|
// are not loaded because of their SUCCESS state.
|
||||||
|
// So we need to continue to run this WAITING procedure. But before
|
||||||
for (Procedure<TEnvironment> proc : procStack.getSubproceduresStack()) {
|
// executing, we need to set its state to RUNNABLE, otherwise, a exception
|
||||||
LOG.error("Corrupted " + proc);
|
// will throw:
|
||||||
procedures.remove(proc.getProcId());
|
// Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
|
||||||
runnableList.remove(proc);
|
// "NOT RUNNABLE! " + procedure.toString());
|
||||||
if (waitingSet != null) waitingSet.remove(proc);
|
proc.setState(ProcedureState.RUNNABLE);
|
||||||
corruptedCount++;
|
runnableList.add(proc);
|
||||||
}
|
|
||||||
itStack.remove();
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if (abortOnCorruption && corruptedCount > 0) {
|
// 5. Push the procedures to the timeout executor
|
||||||
throw new IOException("found " + corruptedCount + " procedures on replay");
|
waitingTimeoutList.forEach(proc -> {
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Push the procedures to the timeout executor
|
|
||||||
if (waitingSet != null && !waitingSet.isEmpty()) {
|
|
||||||
for (Procedure<TEnvironment> proc: waitingSet) {
|
|
||||||
proc.afterReplay(getEnvironment());
|
proc.afterReplay(getEnvironment());
|
||||||
timeoutExecutor.add(proc);
|
timeoutExecutor.add(proc);
|
||||||
}
|
});
|
||||||
}
|
// 6. restore locks
|
||||||
// 5. restore locks
|
|
||||||
restoreLocks();
|
restoreLocks();
|
||||||
// 6. Push the procedure to the scheduler
|
// 7. Push the procedure to the scheduler
|
||||||
failedList.forEach(scheduler::addBack);
|
failedList.forEach(scheduler::addBack);
|
||||||
runnableList.forEach(p -> {
|
runnableList.forEach(p -> {
|
||||||
p.afterReplay(getEnvironment());
|
p.afterReplay(getEnvironment());
|
||||||
|
|
|
@ -85,12 +85,18 @@ public interface ProcedureStore {
|
||||||
boolean hasNext();
|
boolean hasNext();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Calling this method does not need to converting the protobuf message to the Procedure class,
|
||||||
|
* so if it returns true we can call {@link #skipNext()} to skip the procedure without
|
||||||
|
* deserializing. This could increase the performance.
|
||||||
* @return true if the iterator next element is a completed procedure.
|
* @return true if the iterator next element is a completed procedure.
|
||||||
*/
|
*/
|
||||||
boolean isNextFinished();
|
boolean isNextFinished();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Skip the next procedure
|
* Skip the next procedure
|
||||||
|
* <p/>
|
||||||
|
* This method is used to skip the deserializing of the procedure to increase performance, as
|
||||||
|
* when calling next we need to convert the protobuf message to the Procedure class.
|
||||||
*/
|
*/
|
||||||
void skipNext();
|
void skipNext();
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -31,70 +30,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class that loads the procedures stored in a WAL
|
* Helper class that loads the procedures stored in a WAL.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ProcedureWALFormatReader {
|
public class ProcedureWALFormatReader {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
|
||||||
|
|
||||||
// ==============================================================================================
|
/**
|
||||||
// We read the WALs in reverse order from the newest to the oldest.
|
* We will use the localProcedureMap to track the active procedures for the current proc wal file,
|
||||||
// We have different entry types:
|
* and when we finished reading one proc wal file, we will merge he localProcedureMap to the
|
||||||
// - INIT: Procedure submitted by the user (also known as 'root procedure')
|
* procedureMap, which tracks the global active procedures.
|
||||||
// - INSERT: Children added to the procedure <parentId>:[<childId>, ...]
|
* <p/>
|
||||||
// - UPDATE: The specified procedure was updated
|
* See the comments of {@link WALProcedureMap} for more details.
|
||||||
// - DELETE: The procedure was removed (finished/rolledback and result TTL expired)
|
* <p/>
|
||||||
//
|
* After reading all the proc wal files, we will use the procedures in the procedureMap to build a
|
||||||
// In the WAL we can find multiple times the same procedure as UPDATE or INSERT.
|
* {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of
|
||||||
// We read the WAL from top to bottom, so every time we find an entry of the
|
* {@link WALProcedureTree} and the code in {@link #finish()} for more details.
|
||||||
// same procedure, that will be the "latest" update (Caveat: with multiple threads writing
|
*/
|
||||||
// the store, this assumption does not hold).
|
private final WALProcedureMap localProcedureMap = new WALProcedureMap();
|
||||||
//
|
private final WALProcedureMap procedureMap = new WALProcedureMap();
|
||||||
// We keep two in-memory maps:
|
|
||||||
// - localProcedureMap: is the map containing the entries in the WAL we are processing
|
|
||||||
// - procedureMap: is the map containing all the procedures we found up to the WAL in process.
|
|
||||||
// localProcedureMap is merged with the procedureMap once we reach the WAL EOF.
|
|
||||||
//
|
|
||||||
// Since we are reading the WALs in reverse order (newest to oldest),
|
|
||||||
// if we find an entry related to a procedure we already have in 'procedureMap' we can discard it.
|
|
||||||
//
|
|
||||||
// The WAL is append-only so the last procedure in the WAL is the one that
|
|
||||||
// was in execution at the time we crashed/closed the server.
|
|
||||||
// Given that, the procedure replay order can be inferred by the WAL order.
|
|
||||||
//
|
|
||||||
// Example:
|
|
||||||
// WAL-2: [A, B, A, C, D]
|
|
||||||
// WAL-1: [F, G, A, F, B]
|
|
||||||
// Replay-Order: [D, C, A, B, F, G]
|
|
||||||
//
|
|
||||||
// The "localProcedureMap" keeps a "replayOrder" list. Every time we add the
|
|
||||||
// record to the map that record is moved to the head of the "replayOrder" list.
|
|
||||||
// Using the example above:
|
|
||||||
// WAL-2 localProcedureMap.replayOrder is [D, C, A, B]
|
|
||||||
// WAL-1 localProcedureMap.replayOrder is [F, G]
|
|
||||||
//
|
|
||||||
// Each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap'
|
|
||||||
// so using the example above we end up with: [D, C, A, B] + [F, G] as replay order.
|
|
||||||
//
|
|
||||||
// Fast Start: INIT/INSERT record and StackIDs
|
|
||||||
// ---------------------------------------------
|
|
||||||
// We have two special records, INIT and INSERT, that track the first time
|
|
||||||
// the procedure was added to the WAL. We can use this information to be able
|
|
||||||
// to start procedures before reaching the end of the WAL, or before reading all WALs.
|
|
||||||
// But in some cases, the WAL with that record can be already gone.
|
|
||||||
// As an alternative, we can use the stackIds on each procedure,
|
|
||||||
// to identify when a procedure is ready to start.
|
|
||||||
// If there are gaps in the sum of the stackIds we need to read more WALs.
|
|
||||||
//
|
|
||||||
// Example (all procs child of A):
|
|
||||||
// WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5]
|
|
||||||
// WAL-1: [A, B, C, D]
|
|
||||||
//
|
|
||||||
// In the case above we need to read one more WAL to be able to consider
|
|
||||||
// the root procedure A and all children as ready.
|
|
||||||
// ==============================================================================================
|
|
||||||
private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024);
|
|
||||||
private final WALProcedureMap procedureMap = new WALProcedureMap(1024);
|
|
||||||
|
|
||||||
private final ProcedureWALFormat.Loader loader;
|
private final ProcedureWALFormat.Loader loader;
|
||||||
|
|
||||||
|
@ -178,7 +132,7 @@ public class ProcedureWALFormatReader {
|
||||||
localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
|
localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
|
||||||
localProcedureMap.getMaxModifiedProcId());
|
localProcedureMap.getMaxModifiedProcId());
|
||||||
}
|
}
|
||||||
procedureMap.mergeTail(localProcedureMap);
|
procedureMap.merge(localProcedureMap);
|
||||||
}
|
}
|
||||||
if (localTracker.isPartial()) {
|
if (localTracker.isPartial()) {
|
||||||
localTracker.setPartialFlag(false);
|
localTracker.setPartialFlag(false);
|
||||||
|
@ -189,18 +143,11 @@ public class ProcedureWALFormatReader {
|
||||||
// notify the loader about the max proc ID
|
// notify the loader about the max proc ID
|
||||||
loader.setMaxProcId(maxProcId);
|
loader.setMaxProcId(maxProcId);
|
||||||
|
|
||||||
// fetch the procedure ready to run.
|
// build the procedure execution tree. When building we will verify that whether a procedure is
|
||||||
ProcedureIterator procIter = procedureMap.fetchReady();
|
// valid.
|
||||||
if (procIter != null) {
|
WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures());
|
||||||
loader.load(procIter);
|
loader.load(tree.getValidProcs());
|
||||||
}
|
loader.handleCorrupted(tree.getCorruptedProcs());
|
||||||
|
|
||||||
// remaining procedures have missing link or dependencies
|
|
||||||
// consider them as corrupted, manual fix is probably required.
|
|
||||||
procIter = procedureMap.fetchAll();
|
|
||||||
if (procIter != null) {
|
|
||||||
loader.handleCorrupted(procIter);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
|
private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
|
||||||
|
|
|
@ -17,193 +17,50 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.util.Collection;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import java.util.Collections;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
import java.util.HashMap;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
import java.util.Map;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We keep an in-memory map of the procedures sorted by replay order. (see the details in the
|
* This class is used to track the active procedures when loading procedures from proc wal file.
|
||||||
* beginning of {@link ProcedureWALFormatReader}).
|
* <p/>
|
||||||
*
|
* We will read proc wal files from new to old, but when reading a proc wal file, we will still read
|
||||||
* <pre>
|
* from top to bottom, so there are two groups of methods for this class.
|
||||||
* procedureMap = | A | | E | | C | | | | | G | | |
|
* <p/>
|
||||||
* D B
|
* The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used
|
||||||
* replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
|
* when reading a proc wal file. In these methods, for the same procedure, typically the one comes
|
||||||
*
|
* later should win, please see the comment for
|
||||||
* We also have a lazy grouping by "root procedure", and a list of
|
* {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the
|
||||||
* unlinked procedures. If after reading all the WALs we have unlinked
|
* exceptions.
|
||||||
* procedures it means that we had a missing WAL or a corruption.
|
* <p/>
|
||||||
* rootHead = A <-> D <-> G
|
* The second group is {@link #merge(WALProcedureMap)}. We will have a global
|
||||||
* B E
|
* {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap}
|
||||||
* C
|
* to hold the active procedures for the current proc wal file. And when we finish reading a proc
|
||||||
* unlinkFromLinkList = None
|
* wal file, we will merge the local one into the global one, by calling the
|
||||||
* </pre>
|
* {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
|
||||||
|
* method, for the same procedure, the one comes earlier will win, as we read the proc wal files
|
||||||
|
* from new to old(the reverse order).
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
class WALProcedureMap {
|
class WALProcedureMap {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
|
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
|
||||||
|
|
||||||
private static class Entry {
|
private final Map<Long, ProcedureProtos.Procedure> procMap = new HashMap<>();
|
||||||
// For bucketed linked lists in hash-table.
|
|
||||||
private Entry hashNext;
|
|
||||||
// child head
|
|
||||||
private Entry childHead;
|
|
||||||
// double-link for rootHead or childHead
|
|
||||||
private Entry linkNext;
|
|
||||||
private Entry linkPrev;
|
|
||||||
// replay double-linked-list
|
|
||||||
private Entry replayNext;
|
|
||||||
private Entry replayPrev;
|
|
||||||
// procedure-infos
|
|
||||||
private Procedure<?> procedure;
|
|
||||||
private ProcedureProtos.Procedure proto;
|
|
||||||
private boolean ready = false;
|
|
||||||
|
|
||||||
public Entry(Entry hashNext) {
|
|
||||||
this.hashNext = hashNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getProcId() {
|
|
||||||
return proto.getProcId();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getParentId() {
|
|
||||||
return proto.getParentId();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasParent() {
|
|
||||||
return proto.hasParentId();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isReady() {
|
|
||||||
return ready;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isFinished() {
|
|
||||||
if (!hasParent()) {
|
|
||||||
// we only consider 'root' procedures. because for the user 'finished'
|
|
||||||
// means when everything up to the 'root' is finished.
|
|
||||||
switch (proto.getState()) {
|
|
||||||
case ROLLEDBACK:
|
|
||||||
case SUCCESS:
|
|
||||||
return true;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Procedure<?> convert() throws IOException {
|
|
||||||
if (procedure == null) {
|
|
||||||
procedure = ProcedureUtil.convertToProcedure(proto);
|
|
||||||
}
|
|
||||||
return procedure;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
final StringBuilder sb = new StringBuilder();
|
|
||||||
sb.append("Entry(");
|
|
||||||
sb.append(getProcId());
|
|
||||||
sb.append(", parentId=");
|
|
||||||
sb.append(getParentId());
|
|
||||||
sb.append(", class=");
|
|
||||||
sb.append(proto.getClassName());
|
|
||||||
sb.append(")");
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class EntryIterator implements ProcedureIterator {
|
|
||||||
private final Entry replayHead;
|
|
||||||
private Entry current;
|
|
||||||
|
|
||||||
public EntryIterator(Entry replayHead) {
|
|
||||||
this.replayHead = replayHead;
|
|
||||||
this.current = replayHead;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void reset() {
|
|
||||||
this.current = replayHead;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() {
|
|
||||||
return current != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNextFinished() {
|
|
||||||
return current != null && current.isFinished();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void skipNext() {
|
|
||||||
current = current.replayNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Procedure<?> next() throws IOException {
|
|
||||||
try {
|
|
||||||
return current.convert();
|
|
||||||
} finally {
|
|
||||||
current = current.replayNext;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// procedure hash table
|
|
||||||
private Entry[] procedureMap;
|
|
||||||
|
|
||||||
// replay-order double-linked-list
|
|
||||||
private Entry replayOrderHead;
|
|
||||||
private Entry replayOrderTail;
|
|
||||||
|
|
||||||
// root linked-list
|
|
||||||
private Entry rootHead;
|
|
||||||
|
|
||||||
// pending unlinked children (root not present yet)
|
|
||||||
private Entry childUnlinkedHead;
|
|
||||||
|
|
||||||
// Track ProcId range
|
|
||||||
private long minModifiedProcId = Long.MAX_VALUE;
|
private long minModifiedProcId = Long.MAX_VALUE;
|
||||||
|
|
||||||
private long maxModifiedProcId = Long.MIN_VALUE;
|
private long maxModifiedProcId = Long.MIN_VALUE;
|
||||||
|
|
||||||
public WALProcedureMap(int size) {
|
private void trackProcId(long procId) {
|
||||||
procedureMap = new Entry[size];
|
minModifiedProcId = Math.min(minModifiedProcId, procId);
|
||||||
replayOrderHead = null;
|
maxModifiedProcId = Math.max(maxModifiedProcId, procId);
|
||||||
replayOrderTail = null;
|
|
||||||
rootHead = null;
|
|
||||||
childUnlinkedHead = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void add(ProcedureProtos.Procedure procProto) {
|
|
||||||
trackProcIds(procProto.getProcId());
|
|
||||||
Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
|
|
||||||
boolean newEntry = entry.proto == null;
|
|
||||||
// We have seen procedure WALs where the entries are out of order; see HBASE-18152.
|
|
||||||
// To compensate, only replace the Entry procedure if for sure this new procedure
|
|
||||||
// is indeed an entry that came later.
|
|
||||||
// TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs
|
|
||||||
// contain procedure changes goingfrom start to finish in sequence.
|
|
||||||
if (newEntry || isIncreasing(entry.proto, procProto)) {
|
|
||||||
entry.proto = procProto;
|
|
||||||
}
|
|
||||||
addToReplayList(entry);
|
|
||||||
if (newEntry) {
|
|
||||||
if (procProto.hasParentId()) {
|
|
||||||
childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
|
|
||||||
} else {
|
|
||||||
rootHead = addToLinkList(entry, rootHead);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -225,20 +82,44 @@ class WALProcedureMap {
|
||||||
return increasing;
|
return increasing;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean remove(long procId) {
|
public void add(ProcedureProtos.Procedure proc) {
|
||||||
trackProcIds(procId);
|
procMap.compute(proc.getProcId(), (procId, existingProc) -> {
|
||||||
Entry entry = removeFromMap(procId);
|
if (existingProc == null || isIncreasing(existingProc, proc)) {
|
||||||
if (entry != null) {
|
return proc;
|
||||||
unlinkFromReplayList(entry);
|
} else {
|
||||||
unlinkFromLinkList(entry);
|
return existingProc;
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
return false;
|
});
|
||||||
|
trackProcId(proc.getProcId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void trackProcIds(long procId) {
|
public void remove(long procId) {
|
||||||
minModifiedProcId = Math.min(minModifiedProcId, procId);
|
procMap.remove(procId);
|
||||||
maxModifiedProcId = Math.max(maxModifiedProcId, procId);
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return procMap.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean contains(long procId) {
|
||||||
|
return procMap.containsKey(procId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in
|
||||||
|
* will be cleared after merging.
|
||||||
|
*/
|
||||||
|
public void merge(WALProcedureMap other) {
|
||||||
|
other.procMap.forEach(procMap::putIfAbsent);
|
||||||
|
maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
|
||||||
|
minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
|
||||||
|
other.procMap.clear();
|
||||||
|
other.maxModifiedProcId = Long.MIN_VALUE;
|
||||||
|
other.minModifiedProcId = Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<ProcedureProtos.Procedure> getProcedures() {
|
||||||
|
return Collections.unmodifiableCollection(procMap.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMinModifiedProcId() {
|
public long getMinModifiedProcId() {
|
||||||
|
@ -248,360 +129,4 @@ class WALProcedureMap {
|
||||||
public long getMaxModifiedProcId() {
|
public long getMaxModifiedProcId() {
|
||||||
return maxModifiedProcId;
|
return maxModifiedProcId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean contains(long procId) {
|
|
||||||
return getProcedure(procId) != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isEmpty() {
|
|
||||||
return replayOrderHead == null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void clear() {
|
|
||||||
for (int i = 0; i < procedureMap.length; ++i) {
|
|
||||||
procedureMap[i] = null;
|
|
||||||
}
|
|
||||||
replayOrderHead = null;
|
|
||||||
replayOrderTail = null;
|
|
||||||
rootHead = null;
|
|
||||||
childUnlinkedHead = null;
|
|
||||||
minModifiedProcId = Long.MAX_VALUE;
|
|
||||||
maxModifiedProcId = Long.MIN_VALUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. -
|
|
||||||
* The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures
|
|
||||||
* that already exist in the "global" map (the one we are merging the "local" in to). - The
|
|
||||||
* replayOrderList of the "local" nao will be appended to the "global" map replay list. - The
|
|
||||||
* "local" map will be cleared at the end of the operation.
|
|
||||||
*/
|
|
||||||
public void mergeTail(WALProcedureMap other) {
|
|
||||||
for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
|
|
||||||
int slotIndex = getMapSlot(p.getProcId());
|
|
||||||
p.hashNext = procedureMap[slotIndex];
|
|
||||||
procedureMap[slotIndex] = p;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (replayOrderHead == null) {
|
|
||||||
replayOrderHead = other.replayOrderHead;
|
|
||||||
replayOrderTail = other.replayOrderTail;
|
|
||||||
rootHead = other.rootHead;
|
|
||||||
childUnlinkedHead = other.childUnlinkedHead;
|
|
||||||
} else {
|
|
||||||
// append replay list
|
|
||||||
assert replayOrderTail.replayNext == null;
|
|
||||||
assert other.replayOrderHead.replayPrev == null;
|
|
||||||
replayOrderTail.replayNext = other.replayOrderHead;
|
|
||||||
other.replayOrderHead.replayPrev = replayOrderTail;
|
|
||||||
replayOrderTail = other.replayOrderTail;
|
|
||||||
|
|
||||||
// merge rootHead
|
|
||||||
if (rootHead == null) {
|
|
||||||
rootHead = other.rootHead;
|
|
||||||
} else if (other.rootHead != null) {
|
|
||||||
Entry otherTail = findLinkListTail(other.rootHead);
|
|
||||||
otherTail.linkNext = rootHead;
|
|
||||||
rootHead.linkPrev = otherTail;
|
|
||||||
rootHead = other.rootHead;
|
|
||||||
}
|
|
||||||
|
|
||||||
// merge childUnlinkedHead
|
|
||||||
if (childUnlinkedHead == null) {
|
|
||||||
childUnlinkedHead = other.childUnlinkedHead;
|
|
||||||
} else if (other.childUnlinkedHead != null) {
|
|
||||||
Entry otherTail = findLinkListTail(other.childUnlinkedHead);
|
|
||||||
otherTail.linkNext = childUnlinkedHead;
|
|
||||||
childUnlinkedHead.linkPrev = otherTail;
|
|
||||||
childUnlinkedHead = other.childUnlinkedHead;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
|
|
||||||
minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
|
|
||||||
|
|
||||||
other.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an EntryIterator with the list of procedures ready to be added to the executor. A
|
|
||||||
* Procedure is ready if its children and parent are ready.
|
|
||||||
*/
|
|
||||||
public ProcedureIterator fetchReady() {
|
|
||||||
buildGraph();
|
|
||||||
|
|
||||||
Entry readyHead = null;
|
|
||||||
Entry readyTail = null;
|
|
||||||
Entry p = replayOrderHead;
|
|
||||||
while (p != null) {
|
|
||||||
Entry next = p.replayNext;
|
|
||||||
if (p.isReady()) {
|
|
||||||
unlinkFromReplayList(p);
|
|
||||||
if (readyTail != null) {
|
|
||||||
readyTail.replayNext = p;
|
|
||||||
p.replayPrev = readyTail;
|
|
||||||
} else {
|
|
||||||
p.replayPrev = null;
|
|
||||||
readyHead = p;
|
|
||||||
}
|
|
||||||
readyTail = p;
|
|
||||||
p.replayNext = null;
|
|
||||||
}
|
|
||||||
p = next;
|
|
||||||
}
|
|
||||||
// we need the hash-table lookups for parents, so this must be done
|
|
||||||
// out of the loop where we check isReadyToRun()
|
|
||||||
for (p = readyHead; p != null; p = p.replayNext) {
|
|
||||||
removeFromMap(p.getProcId());
|
|
||||||
unlinkFromLinkList(p);
|
|
||||||
}
|
|
||||||
return readyHead != null ? new EntryIterator(readyHead) : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Drain this map and return all procedures in it.
|
|
||||||
*/
|
|
||||||
public ProcedureIterator fetchAll() {
|
|
||||||
Entry head = replayOrderHead;
|
|
||||||
for (Entry p = head; p != null; p = p.replayNext) {
|
|
||||||
removeFromMap(p.getProcId());
|
|
||||||
}
|
|
||||||
for (int i = 0; i < procedureMap.length; ++i) {
|
|
||||||
assert procedureMap[i] == null : "map not empty i=" + i;
|
|
||||||
}
|
|
||||||
replayOrderHead = null;
|
|
||||||
replayOrderTail = null;
|
|
||||||
childUnlinkedHead = null;
|
|
||||||
rootHead = null;
|
|
||||||
return head != null ? new EntryIterator(head) : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void buildGraph() {
|
|
||||||
Entry p = childUnlinkedHead;
|
|
||||||
while (p != null) {
|
|
||||||
Entry next = p.linkNext;
|
|
||||||
Entry rootProc = getRootProcedure(p);
|
|
||||||
if (rootProc != null) {
|
|
||||||
rootProc.childHead = addToLinkList(p, rootProc.childHead);
|
|
||||||
}
|
|
||||||
p = next;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (p = rootHead; p != null; p = p.linkNext) {
|
|
||||||
checkReadyToRun(p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry getRootProcedure(Entry entry) {
|
|
||||||
while (entry != null && entry.hasParent()) {
|
|
||||||
entry = getProcedure(entry.getParentId());
|
|
||||||
}
|
|
||||||
return entry;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A
|
|
||||||
* Procedure is ready when parent and children are ready. "ready" means that we all the
|
|
||||||
* information that we need in-memory.
|
|
||||||
* <p/>
|
|
||||||
* Example-1:<br/>
|
|
||||||
* We have two WALs, we start reading from the newest (wal-2)
|
|
||||||
*
|
|
||||||
* <pre>
|
|
||||||
* wal-2 | C B |
|
|
||||||
* wal-1 | A B C |
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If
|
|
||||||
* B is the only one with parent A we can start C. We have to read one more WAL before being able
|
|
||||||
* to start B.
|
|
||||||
* <p/>
|
|
||||||
* How do we know with the only information in B that we are not ready.
|
|
||||||
* <ul>
|
|
||||||
* <li>easy case, the parent is missing from the global map</li>
|
|
||||||
* <li>more complex case we look at the Stack IDs.</li>
|
|
||||||
* </ul>
|
|
||||||
* The Stack-IDs are added to the procedure order as an incremental index tracking how many times
|
|
||||||
* that procedure was executed, which is equivalent to the number of times we wrote the procedure
|
|
||||||
* to the WAL. <br/>
|
|
||||||
* In the example above:
|
|
||||||
*
|
|
||||||
* <pre>
|
|
||||||
* wal-2: B has stackId = [1, 2]
|
|
||||||
* wal-1: B has stackId = [1]
|
|
||||||
* wal-1: A has stackId = [0]
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap
|
|
||||||
* in the stackIds of B, so something was executed before.
|
|
||||||
* <p/>
|
|
||||||
* To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the
|
|
||||||
* parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is
|
|
||||||
* available.
|
|
||||||
* <p/>
|
|
||||||
* Example-2
|
|
||||||
*
|
|
||||||
* <pre>
|
|
||||||
* wal-2 | A | A stackIds = [0, 2]
|
|
||||||
* wal-1 | A B | B stackIds = [1]
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* There is a gap between A stackIds so something was executed in between.
|
|
||||||
*/
|
|
||||||
private boolean checkReadyToRun(Entry rootEntry) {
|
|
||||||
assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
|
|
||||||
|
|
||||||
if (rootEntry.isFinished()) {
|
|
||||||
// If the root procedure is finished, sub-procedures should be gone
|
|
||||||
if (rootEntry.childHead != null) {
|
|
||||||
LOG.error("unexpected active children for root-procedure: {}", rootEntry);
|
|
||||||
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
|
|
||||||
LOG.error("unexpected active children: {}", p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
|
|
||||||
rootEntry.ready = true;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int stackIdSum = 0;
|
|
||||||
int maxStackId = 0;
|
|
||||||
for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
|
|
||||||
int stackId = 1 + rootEntry.proto.getStackId(i);
|
|
||||||
maxStackId = Math.max(maxStackId, stackId);
|
|
||||||
stackIdSum += stackId;
|
|
||||||
LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId,
|
|
||||||
rootEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
|
|
||||||
for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
|
|
||||||
int stackId = 1 + p.proto.getStackId(i);
|
|
||||||
maxStackId = Math.max(maxStackId, stackId);
|
|
||||||
stackIdSum += stackId;
|
|
||||||
LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// The cmpStackIdSum is this formula for finding the sum of a series of numbers:
|
|
||||||
// http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
|
|
||||||
final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
|
|
||||||
if (cmpStackIdSum == stackIdSum) {
|
|
||||||
rootEntry.ready = true;
|
|
||||||
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
|
|
||||||
p.ready = true;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void unlinkFromReplayList(Entry entry) {
|
|
||||||
if (replayOrderHead == entry) {
|
|
||||||
replayOrderHead = entry.replayNext;
|
|
||||||
}
|
|
||||||
if (replayOrderTail == entry) {
|
|
||||||
replayOrderTail = entry.replayPrev;
|
|
||||||
}
|
|
||||||
if (entry.replayPrev != null) {
|
|
||||||
entry.replayPrev.replayNext = entry.replayNext;
|
|
||||||
}
|
|
||||||
if (entry.replayNext != null) {
|
|
||||||
entry.replayNext.replayPrev = entry.replayPrev;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addToReplayList(final Entry entry) {
|
|
||||||
unlinkFromReplayList(entry);
|
|
||||||
entry.replayNext = replayOrderHead;
|
|
||||||
entry.replayPrev = null;
|
|
||||||
if (replayOrderHead != null) {
|
|
||||||
replayOrderHead.replayPrev = entry;
|
|
||||||
} else {
|
|
||||||
replayOrderTail = entry;
|
|
||||||
}
|
|
||||||
replayOrderHead = entry;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void unlinkFromLinkList(Entry entry) {
|
|
||||||
if (entry == rootHead) {
|
|
||||||
rootHead = entry.linkNext;
|
|
||||||
} else if (entry == childUnlinkedHead) {
|
|
||||||
childUnlinkedHead = entry.linkNext;
|
|
||||||
}
|
|
||||||
if (entry.linkPrev != null) {
|
|
||||||
entry.linkPrev.linkNext = entry.linkNext;
|
|
||||||
}
|
|
||||||
if (entry.linkNext != null) {
|
|
||||||
entry.linkNext.linkPrev = entry.linkPrev;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry addToLinkList(Entry entry, Entry linkHead) {
|
|
||||||
unlinkFromLinkList(entry);
|
|
||||||
entry.linkNext = linkHead;
|
|
||||||
entry.linkPrev = null;
|
|
||||||
if (linkHead != null) {
|
|
||||||
linkHead.linkPrev = entry;
|
|
||||||
}
|
|
||||||
return entry;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry findLinkListTail(Entry linkHead) {
|
|
||||||
Entry tail = linkHead;
|
|
||||||
while (tail.linkNext != null) {
|
|
||||||
tail = tail.linkNext;
|
|
||||||
}
|
|
||||||
return tail;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry addToMap(long procId, boolean hasParent) {
|
|
||||||
int slotIndex = getMapSlot(procId);
|
|
||||||
Entry entry = getProcedure(slotIndex, procId);
|
|
||||||
if (entry != null) {
|
|
||||||
return entry;
|
|
||||||
}
|
|
||||||
|
|
||||||
entry = new Entry(procedureMap[slotIndex]);
|
|
||||||
procedureMap[slotIndex] = entry;
|
|
||||||
return entry;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry removeFromMap(final long procId) {
|
|
||||||
int slotIndex = getMapSlot(procId);
|
|
||||||
Entry prev = null;
|
|
||||||
Entry entry = procedureMap[slotIndex];
|
|
||||||
while (entry != null) {
|
|
||||||
if (procId == entry.getProcId()) {
|
|
||||||
if (prev != null) {
|
|
||||||
prev.hashNext = entry.hashNext;
|
|
||||||
} else {
|
|
||||||
procedureMap[slotIndex] = entry.hashNext;
|
|
||||||
}
|
|
||||||
entry.hashNext = null;
|
|
||||||
return entry;
|
|
||||||
}
|
|
||||||
prev = entry;
|
|
||||||
entry = entry.hashNext;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry getProcedure(long procId) {
|
|
||||||
return getProcedure(getMapSlot(procId), procId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Entry getProcedure(int slotIndex, long procId) {
|
|
||||||
Entry entry = procedureMap[slotIndex];
|
|
||||||
while (entry != null) {
|
|
||||||
if (procId == entry.getProcId()) {
|
|
||||||
return entry;
|
|
||||||
}
|
|
||||||
entry = entry.hashNext;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int getMapSlot(long procId) {
|
|
||||||
return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length);
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -0,0 +1,299 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to build the tree for procedures.
|
||||||
|
* <p/>
|
||||||
|
* We will group the procedures with the root procedure, and then validate each group. For each
|
||||||
|
* group of procedures(with the same root procedure), we will collect all the stack ids, if the max
|
||||||
|
* stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If
|
||||||
|
* not, we will consider all the procedures in this group as corrupted. Please see the code in
|
||||||
|
* {@link #checkReady(Entry, Map)} method.
|
||||||
|
* <p/>
|
||||||
|
* For the procedures not in any group, i.e, can not find the root procedure for these procedures,
|
||||||
|
* we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class WALProcedureTree {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class);
|
||||||
|
|
||||||
|
private static final class Entry {
|
||||||
|
|
||||||
|
private final ProcedureProtos.Procedure proc;
|
||||||
|
|
||||||
|
private final List<Entry> subProcs = new ArrayList<>();
|
||||||
|
|
||||||
|
public Entry(ProcedureProtos.Procedure proc) {
|
||||||
|
this.proc = proc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("Procedure(pid=");
|
||||||
|
sb.append(proc.getProcId());
|
||||||
|
sb.append(", ppid=");
|
||||||
|
sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID);
|
||||||
|
sb.append(", class=");
|
||||||
|
sb.append(proc.getClassName());
|
||||||
|
sb.append(")");
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// when loading we will iterator the procedures twice, so use this class to cache the deserialized
|
||||||
|
// result to prevent deserializing multiple times.
|
||||||
|
private static final class ProtoAndProc {
|
||||||
|
private final ProcedureProtos.Procedure proto;
|
||||||
|
|
||||||
|
private Procedure<?> proc;
|
||||||
|
|
||||||
|
public ProtoAndProc(ProcedureProtos.Procedure proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Procedure<?> getProc() throws IOException {
|
||||||
|
if (proc == null) {
|
||||||
|
proc = ProcedureUtil.convertToProcedure(proto);
|
||||||
|
}
|
||||||
|
return proc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final List<ProtoAndProc> validProcs = new ArrayList<>();
|
||||||
|
|
||||||
|
private final List<ProtoAndProc> corruptedProcs = new ArrayList<>();
|
||||||
|
|
||||||
|
private static boolean isFinished(ProcedureProtos.Procedure proc) {
|
||||||
|
if (!proc.hasParentId()) {
|
||||||
|
switch (proc.getState()) {
|
||||||
|
case ROLLEDBACK:
|
||||||
|
case SUCCESS:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WALProcedureTree(Map<Long, Entry> procMap) {
|
||||||
|
List<Entry> rootEntries = buildTree(procMap);
|
||||||
|
for (Entry rootEntry : rootEntries) {
|
||||||
|
checkReady(rootEntry, procMap);
|
||||||
|
}
|
||||||
|
checkOrphan(procMap);
|
||||||
|
Comparator<ProtoAndProc> cmp =
|
||||||
|
(p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId());
|
||||||
|
Collections.sort(validProcs, cmp);
|
||||||
|
Collections.sort(corruptedProcs, cmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Entry> buildTree(Map<Long, Entry> procMap) {
|
||||||
|
List<Entry> rootEntries = new ArrayList<>();
|
||||||
|
procMap.values().forEach(entry -> {
|
||||||
|
if (!entry.proc.hasParentId()) {
|
||||||
|
rootEntries.add(entry);
|
||||||
|
} else {
|
||||||
|
Entry parentEntry = procMap.get(entry.proc.getParentId());
|
||||||
|
// For a valid procedure this should not be null. We will log the error later if it is null,
|
||||||
|
// as it will not be referenced by any root procedures.
|
||||||
|
if (parentEntry != null) {
|
||||||
|
parentEntry.subProcs.add(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return rootEntries;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc,
|
||||||
|
MutableInt maxStackId) {
|
||||||
|
for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) {
|
||||||
|
int stackId = entry.proc.getStackId(i);
|
||||||
|
if (stackId > maxStackId.intValue()) {
|
||||||
|
maxStackId.setValue(stackId);
|
||||||
|
}
|
||||||
|
stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry);
|
||||||
|
}
|
||||||
|
entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
|
||||||
|
Map<Long, Entry> remainingProcMap) {
|
||||||
|
corruptedProcs.add(new ProtoAndProc(entry.proc));
|
||||||
|
remainingProcMap.remove(entry.proc.getProcId());
|
||||||
|
for (Entry e : entry.subProcs) {
|
||||||
|
addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
|
||||||
|
validProcs.add(new ProtoAndProc(entry.proc));
|
||||||
|
remainingProcMap.remove(entry.proc.getProcId());
|
||||||
|
for (Entry e : entry.subProcs) {
|
||||||
|
addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// In this method first we will check whether the given root procedure and all its sub procedures
|
||||||
|
// are valid, through the procedure stack. And we will also remove all these procedures from the
|
||||||
|
// remainingProcMap, so at last, if there are still procedures in the map, we know that there are
|
||||||
|
// orphan procedures.
|
||||||
|
private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
|
||||||
|
if (isFinished(rootEntry.proc)) {
|
||||||
|
if (!rootEntry.subProcs.isEmpty()) {
|
||||||
|
LOG.error("unexpected active children for root-procedure: {}", rootEntry);
|
||||||
|
rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
|
||||||
|
addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
|
||||||
|
} else {
|
||||||
|
addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Map<Integer, List<Entry>> stackId2Proc = new HashMap<>();
|
||||||
|
MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE);
|
||||||
|
collectStackId(rootEntry, stackId2Proc, maxStackId);
|
||||||
|
// the stack ids should start from 0 and increase by one every time
|
||||||
|
boolean valid = true;
|
||||||
|
for (int i = 0; i <= maxStackId.intValue(); i++) {
|
||||||
|
List<Entry> entries = stackId2Proc.get(i);
|
||||||
|
if (entries == null) {
|
||||||
|
LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId,
|
||||||
|
rootEntry);
|
||||||
|
valid = false;
|
||||||
|
} else if (entries.size() > 1) {
|
||||||
|
LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {}," +
|
||||||
|
" root procedure is {}", entries, i, maxStackId, rootEntry);
|
||||||
|
valid = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (valid) {
|
||||||
|
addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
|
||||||
|
} else {
|
||||||
|
addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkOrphan(Map<Long, Entry> procMap) {
|
||||||
|
procMap.values().forEach(entry -> {
|
||||||
|
LOG.error("Orphan procedure: {}", entry);
|
||||||
|
corruptedProcs.add(new ProtoAndProc(entry.proc));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class Iter implements ProcedureIterator {
|
||||||
|
|
||||||
|
private final List<ProtoAndProc> procs;
|
||||||
|
|
||||||
|
private Iterator<ProtoAndProc> iter;
|
||||||
|
|
||||||
|
private ProtoAndProc current;
|
||||||
|
|
||||||
|
public Iter(List<ProtoAndProc> procs) {
|
||||||
|
this.procs = procs;
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
iter = procs.iterator();
|
||||||
|
if (iter.hasNext()) {
|
||||||
|
current = iter.next();
|
||||||
|
} else {
|
||||||
|
current = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return current != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkNext() {
|
||||||
|
if (!hasNext()) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNextFinished() {
|
||||||
|
checkNext();
|
||||||
|
return isFinished(current.proto);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void moveToNext() {
|
||||||
|
if (iter.hasNext()) {
|
||||||
|
current = iter.next();
|
||||||
|
} else {
|
||||||
|
current = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void skipNext() {
|
||||||
|
checkNext();
|
||||||
|
moveToNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Procedure<?> next() throws IOException {
|
||||||
|
checkNext();
|
||||||
|
Procedure<?> proc = current.getProc();
|
||||||
|
moveToNext();
|
||||||
|
return proc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ProcedureIterator getValidProcs() {
|
||||||
|
return new Iter(validProcs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ProcedureIterator getCorruptedProcs() {
|
||||||
|
return new Iter(corruptedProcs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
|
||||||
|
Map<Long, Entry> procMap = new HashMap<>();
|
||||||
|
for (ProcedureProtos.Procedure proc : procedures) {
|
||||||
|
procMap.put(proc.getProcId(), new Entry(proc));
|
||||||
|
}
|
||||||
|
return new WALProcedureTree(procMap);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,9 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
|
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
@ -72,7 +70,6 @@ public class TestWALProcedureStore {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
|
||||||
|
|
||||||
private static final int PROCEDURE_STORE_SLOTS = 1;
|
private static final int PROCEDURE_STORE_SLOTS = 1;
|
||||||
private static final Procedure NULL_PROC = null;
|
|
||||||
|
|
||||||
private WALProcedureStore procStore;
|
private WALProcedureStore procStore;
|
||||||
|
|
||||||
|
@ -153,7 +150,7 @@ public class TestWALProcedureStore {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWalCleanerSequentialClean() throws Exception {
|
public void testWalCleanerSequentialClean() throws Exception {
|
||||||
final Procedure[] procs = new Procedure[5];
|
final Procedure<?>[] procs = new Procedure[5];
|
||||||
ArrayList<ProcedureWALFile> logs = null;
|
ArrayList<ProcedureWALFile> logs = null;
|
||||||
|
|
||||||
// Insert procedures and roll wal after every insert.
|
// Insert procedures and roll wal after every insert.
|
||||||
|
@ -182,7 +179,7 @@ public class TestWALProcedureStore {
|
||||||
// they are in the starting of the list.
|
// they are in the starting of the list.
|
||||||
@Test
|
@Test
|
||||||
public void testWalCleanerNoHoles() throws Exception {
|
public void testWalCleanerNoHoles() throws Exception {
|
||||||
final Procedure[] procs = new Procedure[5];
|
final Procedure<?>[] procs = new Procedure[5];
|
||||||
ArrayList<ProcedureWALFile> logs = null;
|
ArrayList<ProcedureWALFile> logs = null;
|
||||||
// Insert procedures and roll wal after every insert.
|
// Insert procedures and roll wal after every insert.
|
||||||
for (int i = 0; i < procs.length; i++) {
|
for (int i = 0; i < procs.length; i++) {
|
||||||
|
@ -242,7 +239,7 @@ public class TestWALProcedureStore {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWalCleanerWithEmptyRolls() throws Exception {
|
public void testWalCleanerWithEmptyRolls() throws Exception {
|
||||||
final Procedure[] procs = new Procedure[3];
|
final Procedure<?>[] procs = new Procedure[3];
|
||||||
for (int i = 0; i < procs.length; ++i) {
|
for (int i = 0; i < procs.length; ++i) {
|
||||||
procs[i] = new TestSequentialProcedure();
|
procs[i] = new TestSequentialProcedure();
|
||||||
procStore.insert(procs[i], null);
|
procStore.insert(procs[i], null);
|
||||||
|
@ -284,12 +281,12 @@ public class TestWALProcedureStore {
|
||||||
Set<Long> procIds = new HashSet<>();
|
Set<Long> procIds = new HashSet<>();
|
||||||
|
|
||||||
// Insert something in the log
|
// Insert something in the log
|
||||||
Procedure proc1 = new TestSequentialProcedure();
|
Procedure<?> proc1 = new TestSequentialProcedure();
|
||||||
procIds.add(proc1.getProcId());
|
procIds.add(proc1.getProcId());
|
||||||
procStore.insert(proc1, null);
|
procStore.insert(proc1, null);
|
||||||
|
|
||||||
Procedure proc2 = new TestSequentialProcedure();
|
Procedure<?> proc2 = new TestSequentialProcedure();
|
||||||
Procedure[] child2 = new Procedure[2];
|
Procedure<?>[] child2 = new Procedure[2];
|
||||||
child2[0] = new TestSequentialProcedure();
|
child2[0] = new TestSequentialProcedure();
|
||||||
child2[1] = new TestSequentialProcedure();
|
child2[1] = new TestSequentialProcedure();
|
||||||
|
|
||||||
|
@ -323,11 +320,11 @@ public class TestWALProcedureStore {
|
||||||
@Test
|
@Test
|
||||||
public void testNoTrailerDoubleRestart() throws Exception {
|
public void testNoTrailerDoubleRestart() throws Exception {
|
||||||
// log-0001: proc 0, 1 and 2 are inserted
|
// log-0001: proc 0, 1 and 2 are inserted
|
||||||
Procedure proc0 = new TestSequentialProcedure();
|
Procedure<?> proc0 = new TestSequentialProcedure();
|
||||||
procStore.insert(proc0, null);
|
procStore.insert(proc0, null);
|
||||||
Procedure proc1 = new TestSequentialProcedure();
|
Procedure<?> proc1 = new TestSequentialProcedure();
|
||||||
procStore.insert(proc1, null);
|
procStore.insert(proc1, null);
|
||||||
Procedure proc2 = new TestSequentialProcedure();
|
Procedure<?> proc2 = new TestSequentialProcedure();
|
||||||
procStore.insert(proc2, null);
|
procStore.insert(proc2, null);
|
||||||
procStore.rollWriterForTesting();
|
procStore.rollWriterForTesting();
|
||||||
|
|
||||||
|
@ -420,7 +417,7 @@ public class TestWALProcedureStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertUpdated(final ProcedureStoreTracker tracker,
|
private static void assertUpdated(final ProcedureStoreTracker tracker,
|
||||||
final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
|
final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
|
||||||
for (int index : updatedProcs) {
|
for (int index : updatedProcs) {
|
||||||
long procId = procs[index].getProcId();
|
long procId = procs[index].getProcId();
|
||||||
assertTrue("Procedure id : " + procId, tracker.isModified(procId));
|
assertTrue("Procedure id : " + procId, tracker.isModified(procId));
|
||||||
|
@ -432,7 +429,7 @@ public class TestWALProcedureStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertDeleted(final ProcedureStoreTracker tracker,
|
private static void assertDeleted(final ProcedureStoreTracker tracker,
|
||||||
final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
|
final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
|
||||||
for (int index : deletedProcs) {
|
for (int index : deletedProcs) {
|
||||||
long procId = procs[index].getProcId();
|
long procId = procs[index].getProcId();
|
||||||
assertEquals("Procedure id : " + procId,
|
assertEquals("Procedure id : " + procId,
|
||||||
|
@ -447,7 +444,7 @@ public class TestWALProcedureStore {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCorruptedTrailersRebuild() throws Exception {
|
public void testCorruptedTrailersRebuild() throws Exception {
|
||||||
final Procedure[] procs = new Procedure[6];
|
final Procedure<?>[] procs = new Procedure[6];
|
||||||
for (int i = 0; i < procs.length; ++i) {
|
for (int i = 0; i < procs.length; ++i) {
|
||||||
procs[i] = new TestSequentialProcedure();
|
procs[i] = new TestSequentialProcedure();
|
||||||
}
|
}
|
||||||
|
@ -575,127 +572,20 @@ public class TestWALProcedureStore {
|
||||||
storeRestart(loader);
|
storeRestart(loader);
|
||||||
assertEquals(0, loader.getLoadedCount());
|
assertEquals(0, loader.getLoadedCount());
|
||||||
assertEquals(rootProcs.length, loader.getCorruptedCount());
|
assertEquals(rootProcs.length, loader.getCorruptedCount());
|
||||||
for (Procedure proc: loader.getCorrupted()) {
|
for (Procedure<?> proc : loader.getCorrupted()) {
|
||||||
assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
|
assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
|
||||||
assertTrue(proc.toString(),
|
assertTrue(proc.toString(),
|
||||||
proc.getProcId() > rootProcs.length &&
|
proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
|
||||||
proc.getProcId() <= (rootProcs.length * 2));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWalReplayOrder_AB_A() throws Exception {
|
|
||||||
/*
|
|
||||||
* | A B | -> | A |
|
|
||||||
*/
|
|
||||||
TestProcedure a = new TestProcedure(1, 0);
|
|
||||||
TestProcedure b = new TestProcedure(2, 1);
|
|
||||||
|
|
||||||
procStore.insert(a, null);
|
|
||||||
a.addStackId(0);
|
|
||||||
procStore.update(a);
|
|
||||||
|
|
||||||
procStore.insert(a, new Procedure[] { b });
|
|
||||||
b.addStackId(1);
|
|
||||||
procStore.update(b);
|
|
||||||
|
|
||||||
procStore.rollWriterForTesting();
|
|
||||||
|
|
||||||
a.addStackId(2);
|
|
||||||
procStore.update(a);
|
|
||||||
|
|
||||||
storeRestart(new ProcedureStore.ProcedureLoader() {
|
|
||||||
@Override
|
|
||||||
public void setMaxProcId(long maxProcId) {
|
|
||||||
assertEquals(2, maxProcId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void load(ProcedureIterator procIter) throws IOException {
|
|
||||||
assertTrue(procIter.hasNext());
|
|
||||||
assertEquals(1, procIter.next().getProcId());
|
|
||||||
assertTrue(procIter.hasNext());
|
|
||||||
assertEquals(2, procIter.next().getProcId());
|
|
||||||
assertFalse(procIter.hasNext());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
|
|
||||||
assertFalse(procIter.hasNext());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWalReplayOrder_ABC_BAD() throws Exception {
|
|
||||||
/*
|
|
||||||
* | A B C | -> | B A D |
|
|
||||||
*/
|
|
||||||
TestProcedure a = new TestProcedure(1, 0);
|
|
||||||
TestProcedure b = new TestProcedure(2, 1);
|
|
||||||
TestProcedure c = new TestProcedure(3, 2);
|
|
||||||
TestProcedure d = new TestProcedure(4, 0);
|
|
||||||
|
|
||||||
procStore.insert(a, null);
|
|
||||||
a.addStackId(0);
|
|
||||||
procStore.update(a);
|
|
||||||
|
|
||||||
procStore.insert(a, new Procedure[] { b });
|
|
||||||
b.addStackId(1);
|
|
||||||
procStore.update(b);
|
|
||||||
|
|
||||||
procStore.insert(b, new Procedure[] { c });
|
|
||||||
b.addStackId(2);
|
|
||||||
procStore.update(b);
|
|
||||||
|
|
||||||
procStore.rollWriterForTesting();
|
|
||||||
|
|
||||||
b.addStackId(3);
|
|
||||||
procStore.update(b);
|
|
||||||
|
|
||||||
a.addStackId(4);
|
|
||||||
procStore.update(a);
|
|
||||||
|
|
||||||
procStore.insert(d, null);
|
|
||||||
d.addStackId(0);
|
|
||||||
procStore.update(d);
|
|
||||||
|
|
||||||
storeRestart(new ProcedureStore.ProcedureLoader() {
|
|
||||||
@Override
|
|
||||||
public void setMaxProcId(long maxProcId) {
|
|
||||||
assertEquals(4, maxProcId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void load(ProcedureIterator procIter) throws IOException {
|
|
||||||
assertTrue(procIter.hasNext());
|
|
||||||
assertEquals(4, procIter.next().getProcId());
|
|
||||||
// TODO: This will be multiple call once we do fast-start
|
|
||||||
//assertFalse(procIter.hasNext());
|
|
||||||
|
|
||||||
assertTrue(procIter.hasNext());
|
|
||||||
assertEquals(1, procIter.next().getProcId());
|
|
||||||
assertTrue(procIter.hasNext());
|
|
||||||
assertEquals(2, procIter.next().getProcId());
|
|
||||||
assertTrue(procIter.hasNext());
|
|
||||||
assertEquals(3, procIter.next().getProcId());
|
|
||||||
assertFalse(procIter.hasNext());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
|
|
||||||
assertFalse(procIter.hasNext());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRollAndRemove() throws IOException {
|
public void testRollAndRemove() throws IOException {
|
||||||
// Insert something in the log
|
// Insert something in the log
|
||||||
Procedure proc1 = new TestSequentialProcedure();
|
Procedure<?> proc1 = new TestSequentialProcedure();
|
||||||
procStore.insert(proc1, null);
|
procStore.insert(proc1, null);
|
||||||
|
|
||||||
Procedure proc2 = new TestSequentialProcedure();
|
Procedure<?> proc2 = new TestSequentialProcedure();
|
||||||
procStore.insert(proc2, null);
|
procStore.insert(proc2, null);
|
||||||
|
|
||||||
// roll the log, now we have 2
|
// roll the log, now we have 2
|
||||||
|
@ -942,17 +832,6 @@ public class TestWALProcedureStore {
|
||||||
assertEquals(0, loader.getCorruptedCount());
|
assertEquals(0, loader.getCorruptedCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertEmptyLogDir() {
|
|
||||||
try {
|
|
||||||
FileStatus[] status = fs.listStatus(logDir);
|
|
||||||
assertTrue("expected empty state-log dir", status == null || status.length == 0);
|
|
||||||
} catch (FileNotFoundException e) {
|
|
||||||
fail("expected the state-log dir to be present: " + logDir);
|
|
||||||
} catch (IOException e) {
|
|
||||||
fail("got en exception on state-log dir list: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TestSequentialProcedure extends SequentialProcedure<Void> {
|
public static class TestSequentialProcedure extends SequentialProcedure<Void> {
|
||||||
private static long seqid = 0;
|
private static long seqid = 0;
|
||||||
|
|
||||||
|
@ -961,13 +840,18 @@ public class TestWALProcedureStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure[] execute(Void env) { return null; }
|
protected Procedure<Void>[] execute(Void env) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void rollback(Void env) { }
|
protected void rollback(Void env) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean abort(Void env) { return false; }
|
protected boolean abort(Void env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, SmallTests.class })
|
||||||
|
public class TestWALProcedureTree {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestWALProcedureTree.class);
|
||||||
|
|
||||||
|
public static final class TestProcedure extends Procedure<Void> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setProcId(long procId) {
|
||||||
|
super.setProcId(procId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setParentProcId(long parentProcId) {
|
||||||
|
super.setParentProcId(parentProcId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void addStackIndex(int index) {
|
||||||
|
super.addStackIndex(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure<Void>[] execute(Void env)
|
||||||
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollback(Void env) throws IOException, InterruptedException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean abort(Void env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestProcedure createProc(long procId, long parentProcId) {
|
||||||
|
TestProcedure proc = new TestProcedure();
|
||||||
|
proc.setProcId(procId);
|
||||||
|
if (parentProcId != Procedure.NO_PROC_ID) {
|
||||||
|
proc.setParentProcId(parentProcId);
|
||||||
|
}
|
||||||
|
return proc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<ProcedureProtos.Procedure> toProtos(TestProcedure... procs) {
|
||||||
|
return Arrays.stream(procs).map(p -> {
|
||||||
|
try {
|
||||||
|
return ProcedureUtil.convertToProtoProcedure(p);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<TestProcedure> getProcs(ProcedureIterator iter) throws IOException {
|
||||||
|
List<TestProcedure> procs = new ArrayList<>();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
procs.add((TestProcedure) iter.next());
|
||||||
|
}
|
||||||
|
return procs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMissingStackId() throws IOException {
|
||||||
|
TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
|
||||||
|
proc0.addStackIndex(0);
|
||||||
|
TestProcedure proc1 = createProc(2, 1);
|
||||||
|
proc1.addStackIndex(1);
|
||||||
|
TestProcedure proc2 = createProc(3, 2);
|
||||||
|
proc2.addStackIndex(3);
|
||||||
|
WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
|
||||||
|
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
|
||||||
|
assertEquals(0, validProcs.size());
|
||||||
|
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
|
||||||
|
assertEquals(3, corruptedProcs.size());
|
||||||
|
assertEquals(1, corruptedProcs.get(0).getProcId());
|
||||||
|
assertEquals(2, corruptedProcs.get(1).getProcId());
|
||||||
|
assertEquals(3, corruptedProcs.get(2).getProcId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDuplicatedStackId() throws IOException {
|
||||||
|
TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
|
||||||
|
proc0.addStackIndex(0);
|
||||||
|
TestProcedure proc1 = createProc(2, 1);
|
||||||
|
proc1.addStackIndex(1);
|
||||||
|
TestProcedure proc2 = createProc(3, 2);
|
||||||
|
proc2.addStackIndex(1);
|
||||||
|
WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
|
||||||
|
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
|
||||||
|
assertEquals(0, validProcs.size());
|
||||||
|
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
|
||||||
|
assertEquals(3, corruptedProcs.size());
|
||||||
|
assertEquals(1, corruptedProcs.get(0).getProcId());
|
||||||
|
assertEquals(2, corruptedProcs.get(1).getProcId());
|
||||||
|
assertEquals(3, corruptedProcs.get(2).getProcId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOrphan() throws IOException {
|
||||||
|
TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
|
||||||
|
proc0.addStackIndex(0);
|
||||||
|
TestProcedure proc1 = createProc(2, 1);
|
||||||
|
proc1.addStackIndex(1);
|
||||||
|
TestProcedure proc2 = createProc(3, Procedure.NO_PROC_ID);
|
||||||
|
proc2.addStackIndex(0);
|
||||||
|
TestProcedure proc3 = createProc(5, 4);
|
||||||
|
proc3.addStackIndex(1);
|
||||||
|
WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
|
||||||
|
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
|
||||||
|
assertEquals(3, validProcs.size());
|
||||||
|
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
|
||||||
|
assertEquals(1, corruptedProcs.size());
|
||||||
|
assertEquals(5, corruptedProcs.get(0).getProcId());
|
||||||
|
assertEquals(4, corruptedProcs.get(0).getParentProcId());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
* avoiding port contention if another local HBase instance is already running).
|
* avoiding port contention if another local HBase instance is already running).
|
||||||
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
|
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
|
||||||
* setting it to true.
|
* setting it to true.
|
||||||
|
* Trigger pre commit.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
|
Loading…
Reference in New Issue