HBASE-16130 Add comments to ProcedureStoreTracker.

Change-Id: I09d7c2375fd18a96aea48eaa161799496f491b4f
This commit is contained in:
Apekshit 2016-04-19 16:53:28 -07:00 committed by Apekshit Sharma
parent 9b1ecb31f0
commit a3546a3752
3 changed files with 102 additions and 18 deletions

View File

@ -33,14 +33,27 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
* Keeps track of live procedures.
*
* It can be used by the ProcedureStore to identify which procedures are already
* deleted/completed to avoid the deserialization step on restart.
* deleted/completed to avoid the deserialization step on restart
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureStoreTracker {
// Key is procedure id corresponding to first bit of the bitmap.
private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>();
/**
* If true, do not remove bits corresponding to deleted procedures. Note that this can result
* in huge bitmaps overtime.
* Currently, it's set to true only when building tracker state from logs during recovery. During
* recovery, if we are sure that a procedure has been deleted, reading its old update entries
* can be skipped.
*/
private boolean keepDeletes = false;
/**
* If true, it means tracker has incomplete information about the active/deleted procedures.
* It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
* understand it's real use.
*/
private boolean partial = false;
private long minUpdatedProcId = Long.MAX_VALUE;
@ -48,20 +61,39 @@ public class ProcedureStoreTracker {
public enum DeleteState { YES, NO, MAYBE }
/**
* A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
* Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the
* range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K
* is BITS_PER_WORD.
*/
public static class BitSetNode {
private final static long WORD_MASK = 0xffffffffffffffffL;
private final static int ADDRESS_BITS_PER_WORD = 6;
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
/**
* Mimics {@link ProcedureStoreTracker#partial}.
*/
private final boolean partial;
/**
* Set of procedures which have been updated since last {@link #resetUpdates()}.
* Useful to track procedures which have been updated since last WAL write.
*/
private long[] updated;
/**
* Keeps track of procedure ids which belong to this bitmap's range and have been deleted.
*/
private long[] deleted;
/**
* Offset of bitmap i.e. procedure id corresponding to first bit.
*/
private long start;
public void dump() {
System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
getMinProcId(), getMaxProcId());
getActiveMinProcId(), getActiveMaxProcId());
System.out.println("Update:");
for (int i = 0; i < updated.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
@ -150,6 +182,9 @@ public class ProcedureStoreTracker {
return true;
}
/**
* @return true, if there are no active procedures in this BitSetNode, else false.
*/
public boolean isEmpty() {
// TODO: cache the value
for (int i = 0; i < deleted.length; ++i) {
@ -166,6 +201,9 @@ public class ProcedureStoreTracker {
}
}
/**
* Clears the {@link #deleted} bitmaps.
*/
public void undeleteAll() {
for (int i = 0; i < updated.length; ++i) {
deleted[i] = 0;
@ -182,6 +220,10 @@ public class ProcedureStoreTracker {
}
}
// ========================================================================
// Convert to/from Protocol Buffer.
// ========================================================================
public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
@ -213,7 +255,8 @@ public class ProcedureStoreTracker {
}
public boolean canMerge(final BitSetNode rightNode) {
assert start < rightNode.getEnd();
// Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
assert start < rightNode.start;
return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
}
@ -283,7 +326,7 @@ public class ProcedureStoreTracker {
// ========================================================================
// Min/Max Helpers
// ========================================================================
public long getMinProcId() {
public long getActiveMinProcId() {
long minProcId = start;
for (int i = 0; i < deleted.length; ++i) {
if (deleted[i] == 0) {
@ -303,7 +346,7 @@ public class ProcedureStoreTracker {
return minProcId;
}
public long getMaxProcId() {
public long getActiveMaxProcId() {
long maxProcId = getEnd();
for (int i = deleted.length - 1; i >= 0; --i) {
if (deleted[i] == 0) {
@ -346,10 +389,16 @@ public class ProcedureStoreTracker {
// ========================================================================
// Helpers
// ========================================================================
/**
* @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
*/
private static long alignUp(final long x) {
return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
}
/**
* @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
*/
private static long alignDown(final long x) {
return x & -BITS_PER_WORD;
}
@ -421,6 +470,13 @@ public class ProcedureStoreTracker {
resetUpdates();
}
/**
* If {@link #partial} is false, returns state from the bitmap. If no state is found for
* {@code procId}, returns YES.
* If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE
* if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise,
* returns state from the bitmap.
*/
public DeleteState isDeleted(long procId) {
Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
if (entry != null && entry.getValue().contains(procId)) {
@ -431,14 +487,16 @@ public class ProcedureStoreTracker {
return partial ? DeleteState.MAYBE : DeleteState.YES;
}
public long getMinProcId() {
public long getActiveMinProcId() {
// TODO: Cache?
Map.Entry<Long, BitSetNode> entry = map.firstEntry();
return entry == null ? 0 : entry.getValue().getMinProcId();
return entry == null ? 0 : entry.getValue().getActiveMinProcId();
}
public void setKeepDeletes(boolean keepDeletes) {
this.keepDeletes = keepDeletes;
// If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted
// procedures).
if (!keepDeletes) {
Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
while (it.hasNext()) {
@ -459,6 +517,9 @@ public class ProcedureStoreTracker {
this.partial = isPartial;
}
/**
* @return true, if no procedure is active, else false.
*/
public boolean isEmpty() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
if (entry.getValue().isEmpty() == false) {
@ -468,6 +529,9 @@ public class ProcedureStoreTracker {
return true;
}
/**
* @return true if any procedure was updated since last call to {@link #resetUpdates()}.
*/
public boolean isUpdated() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
if (entry.getValue().isUpdated() == false) {
@ -482,6 +546,10 @@ public class ProcedureStoreTracker {
return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
}
/**
* Clears the list of updated procedure ids. This doesn't affect global list of active
* procedure ids.
*/
public void resetUpdates() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().resetUpdates();
@ -497,7 +565,7 @@ public class ProcedureStoreTracker {
}
private BitSetNode getOrCreateNode(final long procId) {
// can procId fit in the left node?
// If procId can fit in left node (directly or by growing it)
BitSetNode leftNode = null;
boolean leftCanGrow = false;
Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
@ -509,6 +577,7 @@ public class ProcedureStoreTracker {
leftCanGrow = leftNode.canGrow(procId);
}
// If procId can fit in right node (directly or by growing it)
BitSetNode rightNode = null;
boolean rightCanGrow = false;
Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
@ -521,12 +590,11 @@ public class ProcedureStoreTracker {
return mergeNodes(leftNode, rightNode);
}
// If left and right nodes can not merge, decide which one to grow.
if (leftCanGrow && rightCanGrow) {
if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
// grow the left node
return growNode(leftNode, procId);
}
// grow the right node
return growNode(rightNode, procId);
}
}
@ -542,12 +610,16 @@ public class ProcedureStoreTracker {
return growNode(rightNode, procId);
}
// add new node
// add new node if there are no left/right nodes which can be used.
BitSetNode node = new BitSetNode(procId, partial);
map.put(node.getStart(), node);
return node;
}
/**
* Grows {@code node} to contain {@code procId} and updates the map.
* @return {@link BitSetNode} instance which contains {@code procId}.
*/
private BitSetNode growNode(BitSetNode node, long procId) {
map.remove(node.getStart());
node.grow(procId);
@ -555,6 +627,9 @@ public class ProcedureStoreTracker {
return node;
}
/**
* Merges {@code leftNode} & {@code rightNode} and updates the map.
*/
private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
assert leftNode.getStart() < rightNode.getStart();
leftNode.merge(rightNode);
@ -571,6 +646,11 @@ public class ProcedureStoreTracker {
}
}
/**
* Builds
* {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker}
* protocol buffer from current state, serializes it and writes to the {@code stream}.
*/
public void writeTo(final OutputStream stream) throws IOException {
ProcedureProtos.ProcedureStoreTracker.Builder builder =
ProcedureProtos.ProcedureStoreTracker.newBuilder();
@ -580,6 +660,11 @@ public class ProcedureStoreTracker {
builder.build().writeDelimitedTo(stream);
}
/**
* Reads serialized
* {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker}
* protocol buffer from the {@code stream}, and use it to build the state.
*/
public void readFrom(final InputStream stream) throws IOException {
reset();
final ProcedureProtos.ProcedureStoreTracker data =

View File

@ -158,7 +158,8 @@ public final class ProcedureWALFormat {
public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
throws IOException {
long trailerPos = size - 17; // Beginning of the Trailer Jump
// Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset
long trailerPos = size - 17;
if (trailerPos < startPos) {
throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);

View File

@ -800,7 +800,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
.setVersion(ProcedureWALFormat.HEADER_VERSION)
.setType(ProcedureWALFormat.LOG_TYPE_STREAM)
.setMinProcId(storeTracker.getMinProcId())
.setMinProcId(storeTracker.getActiveMinProcId())
.setLogId(logId)
.build();
@ -876,6 +876,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
/**
* Remove all logs with logId <= {@code lastLogId}.
*/
private void removeAllLogs(long lastLogId) {
if (logs.size() <= 1) return;
@ -927,11 +930,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
private static long getLogIdFromName(final String name) {
int end = name.lastIndexOf(".log");
int start = name.lastIndexOf('-') + 1;
while (start < end) {
if (name.charAt(start) != '0')
break;
start++;
}
return Long.parseLong(name.substring(start, end));
}