diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index fe2904b3dd8..d64da109562 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -33,14 +33,27 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; * Keeps track of live procedures. * * It can be used by the ProcedureStore to identify which procedures are already - * deleted/completed to avoid the deserialization step on restart. + * deleted/completed to avoid the deserialization step on restart */ @InterfaceAudience.Private @InterfaceStability.Evolving public class ProcedureStoreTracker { + // Key is procedure id corresponding to first bit of the bitmap. private final TreeMap map = new TreeMap(); + /** + * If true, do not remove bits corresponding to deleted procedures. Note that this can result + * in huge bitmaps overtime. + * Currently, it's set to true only when building tracker state from logs during recovery. During + * recovery, if we are sure that a procedure has been deleted, reading its old update entries + * can be skipped. + */ private boolean keepDeletes = false; + /** + * If true, it means tracker has incomplete information about the active/deleted procedures. + * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to + * understand it's real use. + */ private boolean partial = false; private long minUpdatedProcId = Long.MAX_VALUE; @@ -48,20 +61,39 @@ public class ProcedureStoreTracker { public enum DeleteState { YES, NO, MAYBE } + /** + * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). + * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the + * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K + * is BITS_PER_WORD. + */ public static class BitSetNode { private final static long WORD_MASK = 0xffffffffffffffffL; private final static int ADDRESS_BITS_PER_WORD = 6; private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; + /** + * Mimics {@link ProcedureStoreTracker#partial}. + */ private final boolean partial; + /** + * Set of procedures which have been updated since last {@link #resetUpdates()}. + * Useful to track procedures which have been updated since last WAL write. + */ private long[] updated; + /** + * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. + */ private long[] deleted; + /** + * Offset of bitmap i.e. procedure id corresponding to first bit. + */ private long start; public void dump() { System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), - getMinProcId(), getMaxProcId()); + getActiveMinProcId(), getActiveMaxProcId()); System.out.println("Update:"); for (int i = 0; i < updated.length; ++i) { for (int j = 0; j < BITS_PER_WORD; ++j) { @@ -150,6 +182,9 @@ public class ProcedureStoreTracker { return true; } + /** + * @return true, if there are no active procedures in this BitSetNode, else false. + */ public boolean isEmpty() { // TODO: cache the value for (int i = 0; i < deleted.length; ++i) { @@ -166,6 +201,9 @@ public class ProcedureStoreTracker { } } + /** + * Clears the {@link #deleted} bitmaps. + */ public void undeleteAll() { for (int i = 0; i < updated.length; ++i) { deleted[i] = 0; @@ -182,6 +220,10 @@ public class ProcedureStoreTracker { } } + // ======================================================================== + // Convert to/from Protocol Buffer. + // ======================================================================== + public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); @@ -213,7 +255,8 @@ public class ProcedureStoreTracker { } public boolean canMerge(final BitSetNode rightNode) { - assert start < rightNode.getEnd(); + // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. + assert start < rightNode.start; return (rightNode.getEnd() - start) < MAX_NODE_SIZE; } @@ -283,7 +326,7 @@ public class ProcedureStoreTracker { // ======================================================================== // Min/Max Helpers // ======================================================================== - public long getMinProcId() { + public long getActiveMinProcId() { long minProcId = start; for (int i = 0; i < deleted.length; ++i) { if (deleted[i] == 0) { @@ -303,7 +346,7 @@ public class ProcedureStoreTracker { return minProcId; } - public long getMaxProcId() { + public long getActiveMaxProcId() { long maxProcId = getEnd(); for (int i = deleted.length - 1; i >= 0; --i) { if (deleted[i] == 0) { @@ -346,10 +389,16 @@ public class ProcedureStoreTracker { // ======================================================================== // Helpers // ======================================================================== + /** + * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ private static long alignUp(final long x) { return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; } + /** + * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ private static long alignDown(final long x) { return x & -BITS_PER_WORD; } @@ -421,6 +470,13 @@ public class ProcedureStoreTracker { resetUpdates(); } + /** + * If {@link #partial} is false, returns state from the bitmap. If no state is found for + * {@code procId}, returns YES. + * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE + * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise, + * returns state from the bitmap. + */ public DeleteState isDeleted(long procId) { Map.Entry entry = map.floorEntry(procId); if (entry != null && entry.getValue().contains(procId)) { @@ -431,14 +487,16 @@ public class ProcedureStoreTracker { return partial ? DeleteState.MAYBE : DeleteState.YES; } - public long getMinProcId() { + public long getActiveMinProcId() { // TODO: Cache? Map.Entry entry = map.firstEntry(); - return entry == null ? 0 : entry.getValue().getMinProcId(); + return entry == null ? 0 : entry.getValue().getActiveMinProcId(); } public void setKeepDeletes(boolean keepDeletes) { this.keepDeletes = keepDeletes; + // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted + // procedures). if (!keepDeletes) { Iterator> it = map.entrySet().iterator(); while (it.hasNext()) { @@ -459,6 +517,9 @@ public class ProcedureStoreTracker { this.partial = isPartial; } + /** + * @return true, if no procedure is active, else false. + */ public boolean isEmpty() { for (Map.Entry entry : map.entrySet()) { if (entry.getValue().isEmpty() == false) { @@ -468,6 +529,9 @@ public class ProcedureStoreTracker { return true; } + /** + * @return true if any procedure was updated since last call to {@link #resetUpdates()}. + */ public boolean isUpdated() { for (Map.Entry entry : map.entrySet()) { if (entry.getValue().isUpdated() == false) { @@ -482,6 +546,10 @@ public class ProcedureStoreTracker { return map.floorEntry(minId) != null || map.floorEntry(maxId) != null; } + /** + * Clears the list of updated procedure ids. This doesn't affect global list of active + * procedure ids. + */ public void resetUpdates() { for (Map.Entry entry : map.entrySet()) { entry.getValue().resetUpdates(); @@ -497,7 +565,7 @@ public class ProcedureStoreTracker { } private BitSetNode getOrCreateNode(final long procId) { - // can procId fit in the left node? + // If procId can fit in left node (directly or by growing it) BitSetNode leftNode = null; boolean leftCanGrow = false; Map.Entry leftEntry = map.floorEntry(procId); @@ -509,6 +577,7 @@ public class ProcedureStoreTracker { leftCanGrow = leftNode.canGrow(procId); } + // If procId can fit in right node (directly or by growing it) BitSetNode rightNode = null; boolean rightCanGrow = false; Map.Entry rightEntry = map.ceilingEntry(procId); @@ -521,12 +590,11 @@ public class ProcedureStoreTracker { return mergeNodes(leftNode, rightNode); } + // If left and right nodes can not merge, decide which one to grow. if (leftCanGrow && rightCanGrow) { if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { - // grow the left node return growNode(leftNode, procId); } - // grow the right node return growNode(rightNode, procId); } } @@ -542,12 +610,16 @@ public class ProcedureStoreTracker { return growNode(rightNode, procId); } - // add new node + // add new node if there are no left/right nodes which can be used. BitSetNode node = new BitSetNode(procId, partial); map.put(node.getStart(), node); return node; } + /** + * Grows {@code node} to contain {@code procId} and updates the map. + * @return {@link BitSetNode} instance which contains {@code procId}. + */ private BitSetNode growNode(BitSetNode node, long procId) { map.remove(node.getStart()); node.grow(procId); @@ -555,6 +627,9 @@ public class ProcedureStoreTracker { return node; } + /** + * Merges {@code leftNode} & {@code rightNode} and updates the map. + */ private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { assert leftNode.getStart() < rightNode.getStart(); leftNode.merge(rightNode); @@ -571,6 +646,11 @@ public class ProcedureStoreTracker { } } + /** + * Builds + * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker} + * protocol buffer from current state, serializes it and writes to the {@code stream}. + */ public void writeTo(final OutputStream stream) throws IOException { ProcedureProtos.ProcedureStoreTracker.Builder builder = ProcedureProtos.ProcedureStoreTracker.newBuilder(); @@ -580,6 +660,11 @@ public class ProcedureStoreTracker { builder.build().writeDelimitedTo(stream); } + /** + * Reads serialized + * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker} + * protocol buffer from the {@code stream}, and use it to build the state. + */ public void readFrom(final InputStream stream) throws IOException { reset(); final ProcedureProtos.ProcedureStoreTracker data = diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index add7d03cd9d..0643eed8c52 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -158,7 +158,8 @@ public final class ProcedureWALFormat { public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) throws IOException { - long trailerPos = size - 17; // Beginning of the Trailer Jump + // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset + long trailerPos = size - 17; if (trailerPos < startPos) { throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 560072f1dbc..e0a68563de8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -800,7 +800,7 @@ public class WALProcedureStore extends ProcedureStoreBase { ProcedureWALHeader header = ProcedureWALHeader.newBuilder() .setVersion(ProcedureWALFormat.HEADER_VERSION) .setType(ProcedureWALFormat.LOG_TYPE_STREAM) - .setMinProcId(storeTracker.getMinProcId()) + .setMinProcId(storeTracker.getActiveMinProcId()) .setLogId(logId) .build(); @@ -876,6 +876,9 @@ public class WALProcedureStore extends ProcedureStoreBase { } } + /** + * Remove all logs with logId <= {@code lastLogId}. + */ private void removeAllLogs(long lastLogId) { if (logs.size() <= 1) return; @@ -927,11 +930,6 @@ public class WALProcedureStore extends ProcedureStoreBase { private static long getLogIdFromName(final String name) { int end = name.lastIndexOf(".log"); int start = name.lastIndexOf('-') + 1; - while (start < end) { - if (name.charAt(start) != '0') - break; - start++; - } return Long.parseLong(name.substring(start, end)); }