diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
new file mode 100644
index 00000000000..b76c026d01d
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
@@ -0,0 +1,397 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store;
+import java.util.Arrays;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+ * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
+ * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range
+ * of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is
+ *
+ * We have two main bit sets to describe the state of procedures, the meanings are:
+ *
+ *
+ * ----------------------
+ * | modified | deleted | meaning
+ * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates().
+ * | 1 | 0 | proc was updated (but not deleted).
+ * | 1 | 1 | proc was deleted.
+ * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past).
+ * ----------------------
+ *
+ *
+ * The meaning of modified is that, we have modified the state of the procedure, no matter insert,
+ * update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we will
+ * set the delete to 1.
+ *
+ * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the
+ * partial one, the initial modified value is 0 and the initial deleted value is also 0. In
+ * {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified.
+ */
+class BitSetNode {
+ private static final long WORD_MASK = 0xffffffffffffffffL;
+ private static final int ADDRESS_BITS_PER_WORD = 6;
+ private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
+ private static final int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
+ /**
+ * Mimics {@link ProcedureStoreTracker#partial}. It will effect how we fill the new deleted bits
+ * when growing.
+ */
+ private boolean partial;
+ /**
+ * Set of procedures which have been modified since last {@link #resetModified()}. Useful to track
+ * procedures which have been modified since last WAL write.
+ */
+ private long[] modified;
+ /**
+ * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This
+ * represents global state since it's not reset on WAL rolls.
+ */
+ private long[] deleted;
+ /**
+ * Offset of bitmap i.e. procedure id corresponding to first bit.
+ */
+ private long start;
+ public void dump() {
+ System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(),
+ getActiveMaxProcId());
+ System.out.println("Modified:");
+ for (int i = 0; i < modified.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0");
+ }
+ System.out.println(" " + i);
+ }
+ System.out.println();
+ System.out.println("Delete:");
+ for (int i = 0; i < deleted.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
+ }
+ System.out.println(" " + i);
+ }
+ System.out.println();
+ }
+ public BitSetNode(long procId, boolean partial) {
+ start = alignDown(procId);
+ int count = 1;
+ modified = new long[count];
+ deleted = new long[count];
+ if (!partial) {
+ Arrays.fill(deleted, WORD_MASK);
+ }
+ this.partial = partial;
+ updateState(procId, false);
+ }
+ public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
+ start = data.getStartId();
+ int size = data.getUpdatedCount();
+ assert size == data.getDeletedCount();
+ modified = new long[size];
+ deleted = new long[size];
+ for (int i = 0; i < size; ++i) {
+ modified[i] = data.getUpdated(i);
+ deleted[i] = data.getDeleted(i);
+ }
+ partial = false;
+ }
+ public BitSetNode(BitSetNode other, boolean resetDelete) {
+ this.start = other.start;
+ this.partial = other.partial;
+ this.modified = other.modified.clone();
+ // The resetDelete will be set to true when building cleanup tracker.
+ // The intention here is that, if a procedure is not modified in this tracker, then we do not
+ // need to take care of it, so we will set deleted to true for these bits, i.e, if modified is
+ // 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is
+ // deleted |= ~modified, i.e,
+ if (resetDelete) {
+ this.deleted = new long[other.deleted.length];
+ for (int i = 0; i < this.deleted.length; ++i) {
+ this.deleted[i] |= ~(other.modified[i]);
+ }
+ } else {
+ this.deleted = other.deleted.clone();
+ }
+ }
+ public void insertOrUpdate(final long procId) {
+ updateState(procId, false);
+ }
+ public void delete(final long procId) {
+ updateState(procId, true);
+ }
+ public long getStart() {
+ return start;
+ }
+ public long getEnd() {
+ return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1;
+ }
+ public boolean contains(final long procId) {
+ return start <= procId && procId <= getEnd();
+ }
+ public DeleteState isDeleted(final long procId) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ if (wordIndex >= deleted.length) {
+ return DeleteState.MAYBE;
+ }
+ return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
+ }
+ public boolean isModified(long procId) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ if (wordIndex >= modified.length) {
+ return false;
+ }
+ return (modified[wordIndex] & (1L << bitmapIndex)) != 0;
+ }
+ /**
+ * @return true, if all the procedures has been modified.
+ */
+ public boolean isAllModified() {
+ // TODO: cache the value
+ for (int i = 0; i < modified.length; ++i) {
+ if ((modified[i] | deleted[i]) != WORD_MASK) {
+ return false;
+ }
+ }
+ return true;
+ }
+ /**
+ * @return true, if there are no active procedures in this BitSetNode, else false.
+ */
+ public boolean isEmpty() {
+ // TODO: cache the value
+ for (int i = 0; i < deleted.length; ++i) {
+ if (deleted[i] != WORD_MASK) {
+ return false;
+ }
+ }
+ return true;
+ }
+ public void resetModified() {
+ Arrays.fill(modified, 0);
+ }
+ public void unsetPartialFlag() {
+ partial = false;
+ for (int i = 0; i < modified.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ if ((modified[i] & (1L << j)) == 0) {
+ deleted[i] |= (1L << j);
+ }
+ }
+ }
+ }
+ /**
+ * Convert to
+ * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
+ * protobuf.
+ */
+ public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
+ ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
+ ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
+ builder.setStartId(start);
+ for (int i = 0; i < modified.length; ++i) {
+ builder.addUpdated(modified[i]);
+ builder.addDeleted(deleted[i]);
+ }
+ return builder.build();
+ }
+ // ========================================================================
+ // Grow/Merge Helpers
+ // ========================================================================
+ public boolean canGrow(final long procId) {
+ return Math.abs(procId - start) < MAX_NODE_SIZE;
+ }
+ public boolean canMerge(final BitSetNode rightNode) {
+ // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
+ assert start < rightNode.start;
+ return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
+ }
+ public void grow(final long procId) {
+ int delta, offset;
+ if (procId < start) {
+ // add to head
+ long newStart = alignDown(procId);
+ delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD;
+ offset = delta;
+ start = newStart;
+ } else {
+ // Add to tail
+ long newEnd = alignUp(procId + 1);
+ delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
+ offset = 0;
+ }
+ long[] newBitmap;
+ int oldSize = modified.length;
+ newBitmap = new long[oldSize + delta];
+ for (int i = 0; i < newBitmap.length; ++i) {
+ newBitmap[i] = 0;
+ }
+ System.arraycopy(modified, 0, newBitmap, offset, oldSize);
+ modified = newBitmap;
+ newBitmap = new long[deleted.length + delta];
+ for (int i = 0; i < newBitmap.length; ++i) {
+ newBitmap[i] = partial ? 0 : WORD_MASK;
+ }
+ System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
+ deleted = newBitmap;
+ }
+ public void merge(final BitSetNode rightNode) {
+ int delta = (int) (rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
+ long[] newBitmap;
+ int oldSize = modified.length;
+ int newSize = (delta - rightNode.modified.length);
+ int offset = oldSize + newSize;
+ newBitmap = new long[oldSize + delta];
+ System.arraycopy(modified, 0, newBitmap, 0, oldSize);
+ System.arraycopy(rightNode.modified, 0, newBitmap, offset, rightNode.modified.length);
+ modified = newBitmap;
+ newBitmap = new long[oldSize + delta];
+ System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
+ System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
+ deleted = newBitmap;
+ for (int i = 0; i < newSize; ++i) {
+ modified[offset + i] = 0;
+ deleted[offset + i] = partial ? 0 : WORD_MASK;
+ }
+ }
+ @Override
+ public String toString() {
+ return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
+ }
+ // ========================================================================
+ // Min/Max Helpers
+ // ========================================================================
+ public long getActiveMinProcId() {
+ long minProcId = start;
+ for (int i = 0; i < deleted.length; ++i) {
+ if (deleted[i] == 0) {
+ return (minProcId);
+ }
+ if (deleted[i] != WORD_MASK) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ if ((deleted[i] & (1L << j)) != 0) {
+ return minProcId + j;
+ }
+ }
+ }
+ minProcId += BITS_PER_WORD;
+ }
+ return minProcId;
+ }
+ public long getActiveMaxProcId() {
+ long maxProcId = getEnd();
+ for (int i = deleted.length - 1; i >= 0; --i) {
+ if (deleted[i] == 0) {
+ return maxProcId;
+ }
+ if (deleted[i] != WORD_MASK) {
+ for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
+ if ((deleted[i] & (1L << j)) == 0) {
+ return maxProcId - (BITS_PER_WORD - 1 - j);
+ }
+ }
+ }
+ maxProcId -= BITS_PER_WORD;
+ }
+ return maxProcId;
+ }
+ // ========================================================================
+ // Bitmap Helpers
+ // ========================================================================
+ private int getBitmapIndex(final long procId) {
+ return (int) (procId - start);
+ }
+ void updateState(long procId, boolean isDeleted) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ long value = (1L << bitmapIndex);
+ modified[wordIndex] |= value;
+ if (isDeleted) {
+ deleted[wordIndex] |= value;
+ } else {
+ deleted[wordIndex] &= ~value;
+ }
+ }
+ // ========================================================================
+ // Helpers
+ // ========================================================================
+ /**
+ * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
+ */
+ private static long alignUp(final long x) {
+ return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
+ }
+ /**
+ * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
+ */
+ private static long alignDown(final long x) {
+ return x & -BITS_PER_WORD;
+ }
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index 9c6176d4bb8..8fbc1473ed7 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
@@ -64,17 +63,17 @@ public class NoopProcedureStore extends ProcedureStoreBase {
- public void insert(Procedure proc, Procedure[] subprocs) {
+ public void insert(Procedure> proc, Procedure>[] subprocs) {
// no-op
- public void insert(Procedure[] proc) {
+ public void insert(Procedure>[] proc) {
// no-op
- public void update(Procedure proc) {
+ public void update(Procedure> proc) {
// no-op
@@ -84,7 +83,7 @@ public class NoopProcedureStore extends ProcedureStoreBase {
- public void delete(Procedure proc, long[] subprocs) {
+ public void delete(Procedure> proc, long[] subprocs) {
// no-op
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 72883405d7c..8063b125ba5 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -81,6 +81,7 @@ public interface ProcedureStore {
* @throws IOException if there was an error fetching/deserializing the procedure
* @return the next procedure in the iteration.
+ @SuppressWarnings("rawtypes")
Procedure next() throws IOException;
@@ -173,7 +174,7 @@ public interface ProcedureStore {
* @param proc the procedure to serialize and write to the store.
* @param subprocs the newly created child of the proc.
- void insert(Procedure proc, Procedure[] subprocs);
+ void insert(Procedure> proc, Procedure>[] subprocs);
* Serialize a set of new procedures.
@@ -182,14 +183,14 @@ public interface ProcedureStore {
* @param procs the procedures to serialize and write to the store.
- void insert(Procedure[] procs);
+ void insert(Procedure>[] procs);
* The specified procedure was executed,
* and the new state should be written to the store.
* @param proc the procedure to serialize and write to the store.
- void update(Procedure proc);
+ void update(Procedure> proc);
* The specified procId was removed from the executor,
@@ -205,7 +206,7 @@ public interface ProcedureStore {
* @param parentProc the parent procedure to serialize and write to the store.
* @param subProcIds the IDs of the sub-procedure to remove.
- void delete(Procedure parentProc, long[] subProcIds);
+ void delete(Procedure> parentProc, long[] subProcIds);
* The specified procIds were removed from the executor,
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 2dad5ac72ce..361419ab48f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -53,383 +53,14 @@ public class ProcedureStoreTracker {
* It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
* understand it's real use.
- private boolean partial = false;
+ boolean partial = false;
- private long minUpdatedProcId = Long.MAX_VALUE;
- private long maxUpdatedProcId = Long.MIN_VALUE;
+ private long minModifiedProcId = Long.MAX_VALUE;
+ private long maxModifiedProcId = Long.MIN_VALUE;
public enum DeleteState { YES, NO, MAYBE }
- /**
- * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
- * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the
- * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K
- */
- public static class BitSetNode {
- private final static long WORD_MASK = 0xffffffffffffffffL;
- private final static int ADDRESS_BITS_PER_WORD = 6;
- private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
- private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
- /**
- * Mimics {@link ProcedureStoreTracker#partial}.
- */
- private final boolean partial;
- /* ----------------------
- * | updated | deleted | meaning
- * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates().
- * | 1 | 0 | proc was updated (but not deleted).
- * | 1 | 1 | proc was deleted.
- * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past).
- /* ----------------------
- */
- /**
- * Set of procedures which have been updated since last {@link #resetUpdates()}.
- * Useful to track procedures which have been updated since last WAL write.
- */
- private long[] updated;
- /**
- * Keeps track of procedure ids which belong to this bitmap's range and have been deleted.
- * This represents global state since it's not reset on WAL rolls.
- */
- private long[] deleted;
- /**
- * Offset of bitmap i.e. procedure id corresponding to first bit.
- */
- private long start;
- public void dump() {
- System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
- getActiveMinProcId(), getActiveMaxProcId());
- System.out.println("Update:");
- for (int i = 0; i < updated.length; ++i) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0");
- }
- System.out.println(" " + i);
- }
- System.out.println();
- System.out.println("Delete:");
- for (int i = 0; i < deleted.length; ++i) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
- }
- System.out.println(" " + i);
- }
- System.out.println();
- }
- public BitSetNode(final long procId, final boolean partial) {
- start = alignDown(procId);
- int count = 1;
- updated = new long[count];
- deleted = new long[count];
- for (int i = 0; i < count; ++i) {
- updated[i] = 0;
- deleted[i] = partial ? 0 : WORD_MASK;
- }
- this.partial = partial;
- updateState(procId, false);
- }
- protected BitSetNode(final long start, final long[] updated, final long[] deleted) {
- this.start = start;
- this.updated = updated;
- this.deleted = deleted;
- this.partial = false;
- }
- public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
- start = data.getStartId();
- int size = data.getUpdatedCount();
- updated = new long[size];
- deleted = new long[size];
- for (int i = 0; i < size; ++i) {
- updated[i] = data.getUpdated(i);
- deleted[i] = data.getDeleted(i);
- }
- partial = false;
- }
- public BitSetNode(final BitSetNode other, final boolean resetDelete) {
- this.start = other.start;
- this.partial = other.partial;
- this.updated = other.updated.clone();
- if (resetDelete) {
- this.deleted = new long[other.deleted.length];
- for (int i = 0; i < this.deleted.length; ++i) {
- this.deleted[i] = ~(other.updated[i]);
- }
- } else {
- this.deleted = other.deleted.clone();
- }
- }
- public void update(final long procId) {
- updateState(procId, false);
- }
- public void delete(final long procId) {
- updateState(procId, true);
- }
- public long getStart() {
- return start;
- }
- public long getEnd() {
- return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
- }
- public boolean contains(final long procId) {
- return start <= procId && procId <= getEnd();
- }
- public DeleteState isDeleted(final long procId) {
- int bitmapIndex = getBitmapIndex(procId);
- int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
- if (wordIndex >= deleted.length) {
- return DeleteState.MAYBE;
- }
- return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
- }
- private boolean isUpdated(final long procId) {
- int bitmapIndex = getBitmapIndex(procId);
- int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
- if (wordIndex >= updated.length) {
- return false;
- }
- return (updated[wordIndex] & (1L << bitmapIndex)) != 0;
- }
- public boolean isUpdated() {
- // TODO: cache the value
- for (int i = 0; i < updated.length; ++i) {
- if ((updated[i] | deleted[i]) != WORD_MASK) {
- return false;
- }
- }
- return true;
- }
- /**
- * @return true, if there are no active procedures in this BitSetNode, else false.
- */
- public boolean isEmpty() {
- // TODO: cache the value
- for (int i = 0; i < deleted.length; ++i) {
- if (deleted[i] != WORD_MASK) {
- return false;
- }
- }
- return true;
- }
- public void resetUpdates() {
- for (int i = 0; i < updated.length; ++i) {
- updated[i] = 0;
- }
- }
- /**
- * Clears the {@link #deleted} bitmaps.
- */
- public void undeleteAll() {
- for (int i = 0; i < updated.length; ++i) {
- deleted[i] = 0;
- }
- }
- public void unsetPartialFlag() {
- for (int i = 0; i < updated.length; ++i) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- if ((updated[i] & (1L << j)) == 0) {
- deleted[i] |= (1L << j);
- }
- }
- }
- }
- /**
- * Convert to
- * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
- * protobuf.
- */
- public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
- ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
- ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
- builder.setStartId(start);
- for (int i = 0; i < updated.length; ++i) {
- builder.addUpdated(updated[i]);
- builder.addDeleted(deleted[i]);
- }
- return builder.build();
- }
- // ========================================================================
- // Grow/Merge Helpers
- // ========================================================================
- public boolean canGrow(final long procId) {
- return Math.abs(procId - start) < MAX_NODE_SIZE;
- }
- public boolean canMerge(final BitSetNode rightNode) {
- // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
- assert start < rightNode.start;
- return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
- }
- public void grow(final long procId) {
- int delta, offset;
- if (procId < start) {
- // add to head
- long newStart = alignDown(procId);
- delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD;
- offset = delta;
- start = newStart;
- } else {
- // Add to tail
- long newEnd = alignUp(procId + 1);
- delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
- offset = 0;
- }
- long[] newBitmap;
- int oldSize = updated.length;
- newBitmap = new long[oldSize + delta];
- for (int i = 0; i < newBitmap.length; ++i) {
- newBitmap[i] = 0;
- }
- System.arraycopy(updated, 0, newBitmap, offset, oldSize);
- updated = newBitmap;
- newBitmap = new long[deleted.length + delta];
- for (int i = 0; i < newBitmap.length; ++i) {
- newBitmap[i] = partial ? 0 : WORD_MASK;
- }
- System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
- deleted = newBitmap;
- }
- public void merge(final BitSetNode rightNode) {
- int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
- long[] newBitmap;
- int oldSize = updated.length;
- int newSize = (delta - rightNode.updated.length);
- int offset = oldSize + newSize;
- newBitmap = new long[oldSize + delta];
- System.arraycopy(updated, 0, newBitmap, 0, oldSize);
- System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length);
- updated = newBitmap;
- newBitmap = new long[oldSize + delta];
- System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
- System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
- deleted = newBitmap;
- for (int i = 0; i < newSize; ++i) {
- updated[offset + i] = 0;
- deleted[offset + i] = partial ? 0 : WORD_MASK;
- }
- }
- @Override
- public String toString() {
- return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
- }
- // ========================================================================
- // Min/Max Helpers
- // ========================================================================
- public long getActiveMinProcId() {
- long minProcId = start;
- for (int i = 0; i < deleted.length; ++i) {
- if (deleted[i] == 0) {
- return(minProcId);
- }
- if (deleted[i] != WORD_MASK) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- if ((deleted[i] & (1L << j)) != 0) {
- return minProcId + j;
- }
- }
- }
- minProcId += BITS_PER_WORD;
- }
- return minProcId;
- }
- public long getActiveMaxProcId() {
- long maxProcId = getEnd();
- for (int i = deleted.length - 1; i >= 0; --i) {
- if (deleted[i] == 0) {
- return maxProcId;
- }
- if (deleted[i] != WORD_MASK) {
- for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
- if ((deleted[i] & (1L << j)) == 0) {
- return maxProcId - (BITS_PER_WORD - 1 - j);
- }
- }
- }
- maxProcId -= BITS_PER_WORD;
- }
- return maxProcId;
- }
- // ========================================================================
- // Bitmap Helpers
- // ========================================================================
- private int getBitmapIndex(final long procId) {
- return (int)(procId - start);
- }
- private void updateState(final long procId, final boolean isDeleted) {
- int bitmapIndex = getBitmapIndex(procId);
- int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
- long value = (1L << bitmapIndex);
- updated[wordIndex] |= value;
- if (isDeleted) {
- deleted[wordIndex] |= value;
- } else {
- deleted[wordIndex] &= ~value;
- }
- }
- // ========================================================================
- // Helpers
- // ========================================================================
- /**
- * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
- */
- private static long alignUp(final long x) {
- return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
- }
- /**
- * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
- */
- private static long alignDown(final long x) {
- return x & -BITS_PER_WORD;
- }
- }
- public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
+ public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
final BitSetNode node = new BitSetNode(protoNode);
@@ -440,14 +71,23 @@ public class ProcedureStoreTracker {
* Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
- public void resetTo(final ProcedureStoreTracker tracker) {
+ public void resetTo(ProcedureStoreTracker tracker) {
resetTo(tracker, false);
- public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) {
+ /**
+ * Resets internal state to same as given {@code tracker}, and change the deleted flag according
+ * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
+ *
+ * The {@code resetDelete} will be set to true when building cleanup tracker, please see the
+ * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
+ * deleted flag if {@code resetDelete} is true.
+ */
+ public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
+ reset();
this.partial = tracker.partial;
- this.minUpdatedProcId = tracker.minUpdatedProcId;
- this.maxUpdatedProcId = tracker.maxUpdatedProcId;
+ this.minModifiedProcId = tracker.minModifiedProcId;
+ this.maxModifiedProcId = tracker.maxModifiedProcId;
this.keepDeletes = tracker.keepDeletes;
for (Map.Entry entry : tracker.map.entrySet()) {
map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
@@ -458,25 +98,24 @@ public class ProcedureStoreTracker {
insert(null, procId);
- public void insert(final long[] procIds) {
+ public void insert(long[] procIds) {
for (int i = 0; i < procIds.length; ++i) {
- public void insert(final long procId, final long[] subProcIds) {
- BitSetNode node = null;
- node = update(node, procId);
+ public void insert(long procId, long[] subProcIds) {
+ BitSetNode node = update(null, procId);
for (int i = 0; i < subProcIds.length; ++i) {
node = insert(node, subProcIds[i]);
- private BitSetNode insert(BitSetNode node, final long procId) {
+ private BitSetNode insert(BitSetNode node, long procId) {
if (node == null || !node.contains(procId)) {
node = getOrCreateNode(procId);
- node.update(procId);
+ node.insertOrUpdate(procId);
return node;
@@ -485,11 +124,11 @@ public class ProcedureStoreTracker {
update(null, procId);
- private BitSetNode update(BitSetNode node, final long procId) {
+ private BitSetNode update(BitSetNode node, long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to update procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
- node.update(procId);
+ node.insertOrUpdate(procId);
return node;
@@ -506,7 +145,7 @@ public class ProcedureStoreTracker {
- private BitSetNode delete(BitSetNode node, final long procId) {
+ private BitSetNode delete(BitSetNode node, long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to delete procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
@@ -520,35 +159,62 @@ public class ProcedureStoreTracker {
return node;
- @InterfaceAudience.Private
- public void setDeleted(final long procId, final boolean isDeleted) {
+ /**
+ * Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
+ */
+ public void setMinMaxModifiedProcIds(long min, long max) {
+ this.minModifiedProcId = min;
+ this.maxModifiedProcId = max;
+ }
+ /**
+ * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
+ * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
+ * this is not true, as we will read the wal files in reverse order so a delete may come first.
+ */
+ public void setDeleted(long procId, boolean isDeleted) {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
node.updateState(procId, isDeleted);
- public void setDeletedIfSet(final long... procId) {
+ /**
+ * Set the given bit for the procId to delete if it was modified before.
+ *
+ * This method is used to test whether a procedure wal file can be safely deleted, as if all the
+ * procedures in the given procedure wal file has been modified in the new procedure wal files,
+ * then we can delete it.
+ */
+ public void setDeletedIfModified(long... procId) {
BitSetNode node = null;
for (int i = 0; i < procId.length; ++i) {
node = lookupClosestNode(node, procId[i]);
- if (node != null && node.isUpdated(procId[i])) {
+ if (node != null && node.isModified(procId[i])) {
- public void setDeletedIfSet(final ProcedureStoreTracker tracker) {
+ /**
+ * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
+ * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
+ * then we mark it as deleted.
+ * @see #setDeletedIfModified(long...)
+ */
+ public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
BitSetNode trackerNode = null;
- for (BitSetNode node: map.values()) {
+ for (BitSetNode node : map.values()) {
final long minProcId = node.getStart();
final long maxProcId = node.getEnd();
for (long procId = minProcId; procId <= maxProcId; ++procId) {
- if (!node.isUpdated(procId)) continue;
+ if (!node.isModified(procId)) {
+ continue;
+ }
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
- if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) {
- // the procedure was removed or updated
+ if (trackerNode == null || !trackerNode.contains(procId) ||
+ trackerNode.isModified(procId)) {
+ // the procedure was removed or modified
@@ -568,28 +234,29 @@ public class ProcedureStoreTracker {
private void trackProcIds(long procId) {
- minUpdatedProcId = Math.min(minUpdatedProcId, procId);
- maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
+ minModifiedProcId = Math.min(minModifiedProcId, procId);
+ maxModifiedProcId = Math.max(maxModifiedProcId, procId);
- public long getUpdatedMinProcId() {
- return minUpdatedProcId;
+ public long getModifiedMinProcId() {
+ return minModifiedProcId;
- public long getUpdatedMaxProcId() {
- return maxUpdatedProcId;
+ public long getModifiedMaxProcId() {
+ return maxModifiedProcId;
public void reset() {
this.keepDeletes = false;
this.partial = false;
- resetUpdates();
+ resetModified();
- public boolean isUpdated(long procId) {
+ public boolean isModified(long procId) {
final Map.Entry entry = map.floorEntry(procId);
- return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId);
+ return entry != null && entry.getValue().contains(procId) &&
+ entry.getValue().isModified(procId);
@@ -604,7 +271,7 @@ public class ProcedureStoreTracker {
if (entry != null && entry.getValue().contains(procId)) {
BitSetNode node = entry.getValue();
DeleteState state = node.isDeleted(procId);
- return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
+ return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
return partial ? DeleteState.MAYBE : DeleteState.YES;
@@ -656,11 +323,12 @@ public class ProcedureStoreTracker {
- * @return true if any procedure was updated since last call to {@link #resetUpdates()}.
+ * @return true if all procedure was modified or deleted since last call to
+ * {@link #resetModified()}.
- public boolean isUpdated() {
+ public boolean isAllModified() {
for (Map.Entry entry : map.entrySet()) {
- if (!entry.getValue().isUpdated()) {
+ if (!entry.getValue().isAllModified()) {
return false;
@@ -671,21 +339,15 @@ public class ProcedureStoreTracker {
* Clears the list of updated procedure ids. This doesn't affect global list of active
* procedure ids.
- public void resetUpdates() {
+ public void resetModified() {
for (Map.Entry entry : map.entrySet()) {
- entry.getValue().resetUpdates();
+ entry.getValue().resetModified();
- minUpdatedProcId = Long.MAX_VALUE;
- maxUpdatedProcId = Long.MIN_VALUE;
+ minModifiedProcId = Long.MAX_VALUE;
+ maxModifiedProcId = Long.MIN_VALUE;
- public void undeleteAll() {
- for (Map.Entry entry : map.entrySet()) {
- entry.getValue().undeleteAll();
- }
- }
- private BitSetNode getOrCreateNode(final long procId) {
+ private BitSetNode getOrCreateNode(long procId) {
// If procId can fit in left node (directly or by growing it)
BitSetNode leftNode = null;
boolean leftCanGrow = false;
@@ -760,7 +422,7 @@ public class ProcedureStoreTracker {
public void dump() {
System.out.println("map " + map.size());
- System.out.println("isUpdated " + isUpdated());
+ System.out.println("isAllModified " + isAllModified());
System.out.println("isEmpty " + isEmpty());
for (Map.Entry entry : map.entrySet()) {
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
index dd34896ebb3..ba4480fca7e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
@@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.procedure2.store.wal;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
* Thrown when a procedure WAL is corrupted
public class CorruptedWALProcedureStoreException extends HBaseIOException {
+ private static final long serialVersionUID = -3407300445435898074L;
/** default constructor */
public CorruptedWALProcedureStoreException() {
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 6226350a472..16767446e17 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -15,20 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
@@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Describes a WAL File
public class ProcedureWALFile implements Comparable {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index ac3a52941e9..c9986edc904 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -18,25 +18,22 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
-import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
@@ -45,9 +42,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Helper class that contains the WAL serialization utils.
public final class ProcedureWALFormat {
- private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormat.class);
static final byte LOG_TYPE_STREAM = 0;
static final byte LOG_TYPE_COMPACTED = 1;
@@ -60,6 +55,9 @@ public final class ProcedureWALFormat {
public static class InvalidWALDataException extends IOException {
+ private static final long serialVersionUID = 5471733223070202196L;
public InvalidWALDataException(String s) {
@@ -75,9 +73,9 @@ public final class ProcedureWALFormat {
private ProcedureWALFormat() {}
- public static void load(final Iterator logs,
- final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
- final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
+ public static void load(Iterator logs, ProcedureStoreTracker tracker,
+ Loader loader) throws IOException {
+ ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
try {
// Ignore the last log which is current active log.
@@ -93,8 +91,10 @@ public final class ProcedureWALFormat {
// The tracker is now updated with all the procedures read from the logs
- tracker.setPartialFlag(false);
- tracker.resetUpdates();
+ if (tracker.isPartial()) {
+ tracker.setPartialFlag(false);
+ }
+ tracker.resetModified();
} finally {
@@ -205,7 +205,7 @@ public final class ProcedureWALFormat {
public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
- Procedure proc, Procedure[] subprocs) throws IOException {
+ Procedure> proc, Procedure>[] subprocs) throws IOException {
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
@@ -217,17 +217,17 @@ public final class ProcedureWALFormat {
- public static void writeInsert(ByteSlot slot, Procedure proc)
+ public static void writeInsert(ByteSlot slot, Procedure> proc)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
- public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
+ public static void writeInsert(ByteSlot slot, Procedure> proc, Procedure>[] subprocs)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
- public static void writeUpdate(ByteSlot slot, Procedure proc)
+ public static void writeUpdate(ByteSlot slot, Procedure> proc)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
@@ -240,7 +240,7 @@ public final class ProcedureWALFormat {
- public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs)
+ public static void writeDelete(ByteSlot slot, Procedure> proc, long[] subprocs)
throws IOException {
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 4ab70f18e10..1ac8e01f3ab 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -15,22 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.procedure2.store.wal;
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
@@ -38,7 +34,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Helper class that loads the procedures stored in a WAL
public class ProcedureWALFormatReader {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
@@ -98,8 +93,8 @@ public class ProcedureWALFormatReader {
// In the case above we need to read one more WAL to be able to consider
// the root procedure A and all children as ready.
// ==============================================================================================
- private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
- private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
+ private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024);
+ private final WALProcedureMap procedureMap = new WALProcedureMap(1024);
private final ProcedureWALFormat.Loader loader;
@@ -111,33 +106,31 @@ public class ProcedureWALFormatReader {
* to rebuild the tracker.
private final ProcedureStoreTracker tracker;
- // TODO: private final boolean hasFastStartSupport;
- * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
- * re-build the list of procedures updated in that WAL because we need it for log cleaning
- * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
- * (see {@link WALProcedureStore#removeInactiveLogs()}).
- * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
- * re-building it.
+ * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build
+ * the list of procedures modified in that WAL because we need it for log cleaning purposes. If
+ * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see
+ * {@link WALProcedureStore#removeInactiveLogs()}).
+ *
+ * Notice that, the deleted part for this tracker will not be global valid as we can only count
+ * the deletes in the current file, but it is not big problem as finally, the above tracker will
+ * have the global state of deleted, and it will also be used to build the cleanup tracker.
private ProcedureStoreTracker localTracker;
- // private long compactionLogId;
private long maxProcId = 0;
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
ProcedureWALFormat.Loader loader) {
this.tracker = tracker;
this.loader = loader;
- // we support fast-start only if we have a clean shutdown.
- // this.hasFastStartSupport = !tracker.isEmpty();
- public void read(final ProcedureWALFile log) throws IOException {
- localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
- if (localTracker != null) {
- LOG.info("Rebuilding tracker for " + log);
+ public void read(ProcedureWALFile log) throws IOException {
+ localTracker = log.getTracker();
+ if (localTracker.isPartial()) {
+ LOG.info("Rebuilding tracker for {}", log);
long count = 0;
@@ -147,7 +140,7 @@ public class ProcedureWALFormatReader {
while (hasMore) {
ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
if (entry == null) {
- LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log);
+ LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log);
@@ -178,21 +171,17 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
- if (localTracker != null) {
- localTracker.setPartialFlag(false);
- }
if (!localProcedureMap.isEmpty()) {
- log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
+ log.setProcIds(localProcedureMap.getMinModifiedProcId(),
+ localProcedureMap.getMaxModifiedProcId());
+ if (localTracker.isPartial()) {
+ localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
+ localProcedureMap.getMaxModifiedProcId());
+ }
- //if (hasFastStartSupport) {
- // TODO: Some procedure may be already runnables (see readInitEntry())
- // (we can also check the "update map" in the log trackers)
- // --------------------------------------------------
- //EntryIterator iter = procedureMap.fetchReady();
- //if (iter != null) loader.load(iter);
- // --------------------------------------------------
- //}
+ }
+ if (localTracker.isPartial()) {
+ localTracker.setPartialFlag(false);
@@ -202,37 +191,46 @@ public class ProcedureWALFormatReader {
// fetch the procedure ready to run.
ProcedureIterator procIter = procedureMap.fetchReady();
- if (procIter != null) loader.load(procIter);
+ if (procIter != null) {
+ loader.load(procIter);
+ }
// remaining procedures have missing link or dependencies
// consider them as corrupted, manual fix is probably required.
procIter = procedureMap.fetchAll();
- if (procIter != null) loader.handleCorrupted(procIter);
+ if (procIter != null) {
+ loader.handleCorrupted(procIter);
+ }
- private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) {
+ private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
+ if (tracker.isPartial()) {
+ tracker.setDeleted(procId, true);
+ }
+ }
+ private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) {
+ if (tracker.isPartial()) {
+ tracker.insert(proc.getProcId());
+ }
+ }
+ private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) {
maxProcId = Math.max(maxProcId, proc.getProcId());
if (isRequired(proc.getProcId())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId());
- }
+ LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId());
- if (tracker.isPartial()) {
- tracker.insert(proc.getProcId());
- }
- }
- if (localTracker != null) {
- localTracker.insert(proc.getProcId());
+ insertIfPartial(tracker, proc);
+ insertIfPartial(localTracker, proc);
- private void readInitEntry(final ProcedureWALEntry entry)
- throws IOException {
+ private void readInitEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
loadProcedure(entry, entry.getProcedure(0));
- private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
+ private void readInsertEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
loadProcedure(entry, entry.getProcedure(0));
for (int i = 1; i < entry.getProcedureCount(); ++i) {
@@ -240,12 +238,12 @@ public class ProcedureWALFormatReader {
- private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
+ private void readUpdateEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
loadProcedure(entry, entry.getProcedure(0));
- private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
+ private void readDeleteEntry(ProcedureWALEntry entry) {
assert entry.hasProcId() : "expected ProcID";
if (entry.getChildIdCount() > 0) {
@@ -267,598 +265,19 @@ public class ProcedureWALFormatReader {
private void deleteEntry(final long procId) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("delete entry " + procId);
- }
+ LOG.trace("delete entry {}", procId);
maxProcId = Math.max(maxProcId, procId);
assert !procedureMap.contains(procId);
- if (tracker.isPartial()) {
- tracker.setDeleted(procId, true);
- }
- if (localTracker != null) {
- // In case there is only delete entry for this procedure in current log.
- localTracker.setDeleted(procId, true);
- }
+ setDeletedIfPartial(tracker, procId);
+ setDeletedIfPartial(localTracker, procId);
- private boolean isDeleted(final long procId) {
+ private boolean isDeleted(long procId) {
return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
- private boolean isRequired(final long procId) {
+ private boolean isRequired(long procId) {
return !isDeleted(procId) && !procedureMap.contains(procId);
- // ==========================================================================
- // We keep an in-memory map of the procedures sorted by replay order.
- // (see the details in the beginning of the file)
- // _______________________________________________
- // procedureMap = | A | | E | | C | | | | | G | | |
- // D B
- // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
- //
- // We also have a lazy grouping by "root procedure", and a list of
- // unlinked procedures. If after reading all the WALs we have unlinked
- // procedures it means that we had a missing WAL or a corruption.
- // rootHead = A <-> D <-> G
- // B E
- // C
- // unlinkFromLinkList = None
- // ==========================================================================
- private static class Entry {
- // For bucketed linked lists in hash-table.
- protected Entry hashNext;
- // child head
- protected Entry childHead;
- // double-link for rootHead or childHead
- protected Entry linkNext;
- protected Entry linkPrev;
- // replay double-linked-list
- protected Entry replayNext;
- protected Entry replayPrev;
- // procedure-infos
- protected Procedure procedure;
- protected ProcedureProtos.Procedure proto;
- protected boolean ready = false;
- public Entry(Entry hashNext) {
- this.hashNext = hashNext;
- }
- public long getProcId() {
- return proto.getProcId();
- }
- public long getParentId() {
- return proto.getParentId();
- }
- public boolean hasParent() {
- return proto.hasParentId();
- }
- public boolean isReady() {
- return ready;
- }
- public boolean isFinished() {
- if (!hasParent()) {
- // we only consider 'root' procedures. because for the user 'finished'
- // means when everything up to the 'root' is finished.
- switch (proto.getState()) {
- case SUCCESS:
- return true;
- default:
- break;
- }
- }
- return false;
- }
- public Procedure convert() throws IOException {
- if (procedure == null) {
- procedure = ProcedureUtil.convertToProcedure(proto);
- }
- return procedure;
- }
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("Entry(");
- sb.append(getProcId());
- sb.append(", parentId=");
- sb.append(getParentId());
- sb.append(", class=");
- sb.append(proto.getClassName());
- sb.append(")");
- return sb.toString();
- }
- }
- private static class EntryIterator implements ProcedureIterator {
- private final Entry replayHead;
- private Entry current;
- public EntryIterator(Entry replayHead) {
- this.replayHead = replayHead;
- this.current = replayHead;
- }
- @Override
- public void reset() {
- this.current = replayHead;
- }
- @Override
- public boolean hasNext() {
- return current != null;
- }
- @Override
- public boolean isNextFinished() {
- return current != null && current.isFinished();
- }
- @Override
- public void skipNext() {
- current = current.replayNext;
- }
- @Override
- public Procedure next() throws IOException {
- try {
- return current.convert();
- } finally {
- current = current.replayNext;
- }
- }
- }
- private static class WalProcedureMap {
- // procedure hash table
- private Entry[] procedureMap;
- // replay-order double-linked-list
- private Entry replayOrderHead;
- private Entry replayOrderTail;
- // root linked-list
- private Entry rootHead;
- // pending unlinked children (root not present yet)
- private Entry childUnlinkedHead;
- // Track ProcId range
- private long minProcId = Long.MAX_VALUE;
- private long maxProcId = Long.MIN_VALUE;
- public WalProcedureMap(int size) {
- procedureMap = new Entry[size];
- replayOrderHead = null;
- replayOrderTail = null;
- rootHead = null;
- childUnlinkedHead = null;
- }
- public void add(ProcedureProtos.Procedure procProto) {
- trackProcIds(procProto.getProcId());
- Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
- boolean newEntry = entry.proto == null;
- // We have seen procedure WALs where the entries are out of order; see HBASE-18152.
- // To compensate, only replace the Entry procedure if for sure this new procedure
- // is indeed an entry that came later. TODO: Fix the writing of procedure info so
- // it does not violate basic expectation, that WALs contain procedure changes going
- // from start to finish in sequence.
- if (newEntry || isIncreasing(entry.proto, procProto)) {
- entry.proto = procProto;
- }
- addToReplayList(entry);
- if(newEntry) {
- if (procProto.hasParentId()) {
- childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
- } else {
- rootHead = addToLinkList(entry, rootHead);
- }
- }
- }
- /**
- * @return True if this new procedure is 'richer' than the current one else
- * false and we log this incidence where it appears that the WAL has older entries
- * appended after newer ones. See HBASE-18152.
- */
- private static boolean isIncreasing(ProcedureProtos.Procedure current,
- ProcedureProtos.Procedure candidate) {
- // Check that the procedures we see are 'increasing'. We used to compare
- // procedure id first and then update time but it can legitimately go backwards if the
- // procedure is failed or rolled back so that was unreliable. Was going to compare
- // state but lets see if comparing update time enough (unfortunately this issue only
- // seen under load...)
- boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate();
- if (!increasing) {
- LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate);
- }
- return increasing;
- }
- public boolean remove(long procId) {
- trackProcIds(procId);
- Entry entry = removeFromMap(procId);
- if (entry != null) {
- unlinkFromReplayList(entry);
- unlinkFromLinkList(entry);
- return true;
- }
- return false;
- }
- private void trackProcIds(long procId) {
- minProcId = Math.min(minProcId, procId);
- maxProcId = Math.max(maxProcId, procId);
- }
- public long getMinProcId() {
- return minProcId;
- }
- public long getMaxProcId() {
- return maxProcId;
- }
- public boolean contains(long procId) {
- return getProcedure(procId) != null;
- }
- public boolean isEmpty() {
- return replayOrderHead == null;
- }
- public void clear() {
- for (int i = 0; i < procedureMap.length; ++i) {
- procedureMap[i] = null;
- }
- replayOrderHead = null;
- replayOrderTail = null;
- rootHead = null;
- childUnlinkedHead = null;
- minProcId = Long.MAX_VALUE;
- maxProcId = Long.MIN_VALUE;
- }
- /*
- * Merges two WalProcedureMap,
- * the target is the "global" map, the source is the "local" map.
- * - The entries in the hashtables are guaranteed to be unique.
- * On replay we don't load procedures that already exist in the "global"
- * map (the one we are merging the "local" in to).
- * - The replayOrderList of the "local" nao will be appended to the "global"
- * map replay list.
- * - The "local" map will be cleared at the end of the operation.
- */
- public void mergeTail(WalProcedureMap other) {
- for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
- int slotIndex = getMapSlot(p.getProcId());
- p.hashNext = procedureMap[slotIndex];
- procedureMap[slotIndex] = p;
- }
- if (replayOrderHead == null) {
- replayOrderHead = other.replayOrderHead;
- replayOrderTail = other.replayOrderTail;
- rootHead = other.rootHead;
- childUnlinkedHead = other.childUnlinkedHead;
- } else {
- // append replay list
- assert replayOrderTail.replayNext == null;
- assert other.replayOrderHead.replayPrev == null;
- replayOrderTail.replayNext = other.replayOrderHead;
- other.replayOrderHead.replayPrev = replayOrderTail;
- replayOrderTail = other.replayOrderTail;
- // merge rootHead
- if (rootHead == null) {
- rootHead = other.rootHead;
- } else if (other.rootHead != null) {
- Entry otherTail = findLinkListTail(other.rootHead);
- otherTail.linkNext = rootHead;
- rootHead.linkPrev = otherTail;
- rootHead = other.rootHead;
- }
- // merge childUnlinkedHead
- if (childUnlinkedHead == null) {
- childUnlinkedHead = other.childUnlinkedHead;
- } else if (other.childUnlinkedHead != null) {
- Entry otherTail = findLinkListTail(other.childUnlinkedHead);
- otherTail.linkNext = childUnlinkedHead;
- childUnlinkedHead.linkPrev = otherTail;
- childUnlinkedHead = other.childUnlinkedHead;
- }
- }
- maxProcId = Math.max(maxProcId, other.maxProcId);
- minProcId = Math.max(minProcId, other.minProcId);
- other.clear();
- }
- /*
- * Returns an EntryIterator with the list of procedures ready
- * to be added to the executor.
- * A Procedure is ready if its children and parent are ready.
- */
- public EntryIterator fetchReady() {
- buildGraph();
- Entry readyHead = null;
- Entry readyTail = null;
- Entry p = replayOrderHead;
- while (p != null) {
- Entry next = p.replayNext;
- if (p.isReady()) {
- unlinkFromReplayList(p);
- if (readyTail != null) {
- readyTail.replayNext = p;
- p.replayPrev = readyTail;
- } else {
- p.replayPrev = null;
- readyHead = p;
- }
- readyTail = p;
- p.replayNext = null;
- }
- p = next;
- }
- // we need the hash-table lookups for parents, so this must be done
- // out of the loop where we check isReadyToRun()
- for (p = readyHead; p != null; p = p.replayNext) {
- removeFromMap(p.getProcId());
- unlinkFromLinkList(p);
- }
- return readyHead != null ? new EntryIterator(readyHead) : null;
- }
- /*
- * Drain this map and return all procedures in it.
- */
- public EntryIterator fetchAll() {
- Entry head = replayOrderHead;
- for (Entry p = head; p != null; p = p.replayNext) {
- removeFromMap(p.getProcId());
- }
- for (int i = 0; i < procedureMap.length; ++i) {
- assert procedureMap[i] == null : "map not empty i=" + i;
- }
- replayOrderHead = null;
- replayOrderTail = null;
- childUnlinkedHead = null;
- rootHead = null;
- return head != null ? new EntryIterator(head) : null;
- }
- private void buildGraph() {
- Entry p = childUnlinkedHead;
- while (p != null) {
- Entry next = p.linkNext;
- Entry rootProc = getRootProcedure(p);
- if (rootProc != null) {
- rootProc.childHead = addToLinkList(p, rootProc.childHead);
- }
- p = next;
- }
- for (p = rootHead; p != null; p = p.linkNext) {
- checkReadyToRun(p);
- }
- }
- private Entry getRootProcedure(Entry entry) {
- while (entry != null && entry.hasParent()) {
- entry = getProcedure(entry.getParentId());
- }
- return entry;
- }
- /*
- * (see the comprehensive explanation in the beginning of the file)
- * A Procedure is ready when parent and children are ready.
- * "ready" means that we all the information that we need in-memory.
- *
- * Example-1:
- * We have two WALs, we start reading from the newest (wal-2)
- * wal-2 | C B |
- * wal-1 | A B C |
- *
- * If C and B don't depend on A (A is not the parent), we can start them
- * before reading wal-1. If B is the only one with parent A we can start C.
- * We have to read one more WAL before being able to start B.
- *
- * How do we know with the only information in B that we are not ready.
- * - easy case, the parent is missing from the global map
- * - more complex case we look at the Stack IDs.
- *
- * The Stack-IDs are added to the procedure order as an incremental index
- * tracking how many times that procedure was executed, which is equivalent
- * to the number of times we wrote the procedure to the WAL.
- * In the example above:
- * wal-2: B has stackId = [1, 2]
- * wal-1: B has stackId = [1]
- * wal-1: A has stackId = [0]
- *
- * Since we know that the Stack-IDs are incremental for a Procedure,
- * we notice that there is a gap in the stackIds of B, so something was
- * executed before.
- * To identify when a Procedure is ready we do the sum of the stackIds of
- * the procedure and the parent. if the stackIdSum is equal to the
- * sum of {1..maxStackId} then everything we need is available.
- *
- * Example-2
- * wal-2 | A | A stackIds = [0, 2]
- * wal-1 | A B | B stackIds = [1]
- *
- * There is a gap between A stackIds so something was executed in between.
- */
- private boolean checkReadyToRun(Entry rootEntry) {
- assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
- if (rootEntry.isFinished()) {
- // If the root procedure is finished, sub-procedures should be gone
- if (rootEntry.childHead != null) {
- LOG.error("unexpected active children for root-procedure: " + rootEntry);
- for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
- LOG.error("unexpected active children: " + p);
- }
- }
- assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
- rootEntry.ready = true;
- return true;
- }
- int stackIdSum = 0;
- int maxStackId = 0;
- for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
- int stackId = 1 + rootEntry.proto.getStackId(i);
- maxStackId = Math.max(maxStackId, stackId);
- stackIdSum += stackId;
- if (LOG.isTraceEnabled()) {
- LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum +
- " maxStackid=" + maxStackId + " " + rootEntry);
- }
- }
- for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
- for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
- int stackId = 1 + p.proto.getStackId(i);
- maxStackId = Math.max(maxStackId, stackId);
- stackIdSum += stackId;
- if (LOG.isTraceEnabled()) {
- LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum +
- " maxStackid=" + maxStackId + " " + p);
- }
- }
- }
- // The cmpStackIdSum is this formula for finding the sum of a series of numbers:
- // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
- final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
- if (cmpStackIdSum == stackIdSum) {
- rootEntry.ready = true;
- for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
- p.ready = true;
- }
- return true;
- }
- return false;
- }
- private void unlinkFromReplayList(Entry entry) {
- if (replayOrderHead == entry) {
- replayOrderHead = entry.replayNext;
- }
- if (replayOrderTail == entry) {
- replayOrderTail = entry.replayPrev;
- }
- if (entry.replayPrev != null) {
- entry.replayPrev.replayNext = entry.replayNext;
- }
- if (entry.replayNext != null) {
- entry.replayNext.replayPrev = entry.replayPrev;
- }
- }
- private void addToReplayList(final Entry entry) {
- unlinkFromReplayList(entry);
- entry.replayNext = replayOrderHead;
- entry.replayPrev = null;
- if (replayOrderHead != null) {
- replayOrderHead.replayPrev = entry;
- } else {
- replayOrderTail = entry;
- }
- replayOrderHead = entry;
- }
- private void unlinkFromLinkList(Entry entry) {
- if (entry == rootHead) {
- rootHead = entry.linkNext;
- } else if (entry == childUnlinkedHead) {
- childUnlinkedHead = entry.linkNext;
- }
- if (entry.linkPrev != null) {
- entry.linkPrev.linkNext = entry.linkNext;
- }
- if (entry.linkNext != null) {
- entry.linkNext.linkPrev = entry.linkPrev;
- }
- }
- private Entry addToLinkList(Entry entry, Entry linkHead) {
- unlinkFromLinkList(entry);
- entry.linkNext = linkHead;
- entry.linkPrev = null;
- if (linkHead != null) {
- linkHead.linkPrev = entry;
- }
- return entry;
- }
- private Entry findLinkListTail(Entry linkHead) {
- Entry tail = linkHead;
- while (tail.linkNext != null) {
- tail = tail.linkNext;
- }
- return tail;
- }
- private Entry addToMap(final long procId, final boolean hasParent) {
- int slotIndex = getMapSlot(procId);
- Entry entry = getProcedure(slotIndex, procId);
- if (entry != null) return entry;
- entry = new Entry(procedureMap[slotIndex]);
- procedureMap[slotIndex] = entry;
- return entry;
- }
- private Entry removeFromMap(final long procId) {
- int slotIndex = getMapSlot(procId);
- Entry prev = null;
- Entry entry = procedureMap[slotIndex];
- while (entry != null) {
- if (procId == entry.getProcId()) {
- if (prev != null) {
- prev.hashNext = entry.hashNext;
- } else {
- procedureMap[slotIndex] = entry.hashNext;
- }
- entry.hashNext = null;
- return entry;
- }
- prev = entry;
- entry = entry.hashNext;
- }
- return null;
- }
- private Entry getProcedure(final long procId) {
- return getProcedure(getMapSlot(procId), procId);
- }
- private Entry getProcedure(final int slotIndex, final long procId) {
- Entry entry = procedureMap[slotIndex];
- while (entry != null) {
- if (procId == entry.getProcId()) {
- return entry;
- }
- entry = entry.hashNext;
- }
- return null;
- }
- private int getMapSlot(final long procId) {
- return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length);
- }
- }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
index a7712b14033..a11a46bc01c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,20 +29,21 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
* ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file
@@ -160,7 +160,7 @@ public class ProcedureWALPrettyPrinter extends Configured implements Tool {
final List files = new ArrayList<>();
try {
- CommandLine cmd = new PosixParser().parse(options, args);
+ CommandLine cmd = new DefaultParser().parse(options, args);
if (cmd.hasOption("f")) {
files.add(new Path(cmd.getOptionValue("f")));
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
new file mode 100644
index 00000000000..18d7823cba6
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
@@ -0,0 +1,607 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.wal;
+import java.io.IOException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+ * We keep an in-memory map of the procedures sorted by replay order. (see the details in the
+ * beginning of {@link ProcedureWALFormatReader}).
+ *
+ *
+ * procedureMap = | A | | E | | C | | | | | G | | |
+ * D B
+ * replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
+ *
+ * We also have a lazy grouping by "root procedure", and a list of
+ * unlinked procedures. If after reading all the WALs we have unlinked
+ * procedures it means that we had a missing WAL or a corruption.
+ * rootHead = A <-> D <-> G
+ * B E
+ * C
+ * unlinkFromLinkList = None
+ *
+ */
+class WALProcedureMap {
+ private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
+ private static class Entry {
+ // For bucketed linked lists in hash-table.
+ private Entry hashNext;
+ // child head
+ private Entry childHead;
+ // double-link for rootHead or childHead
+ private Entry linkNext;
+ private Entry linkPrev;
+ // replay double-linked-list
+ private Entry replayNext;
+ private Entry replayPrev;
+ // procedure-infos
+ private Procedure> procedure;
+ private ProcedureProtos.Procedure proto;
+ private boolean ready = false;
+ public Entry(Entry hashNext) {
+ this.hashNext = hashNext;
+ }
+ public long getProcId() {
+ return proto.getProcId();
+ }
+ public long getParentId() {
+ return proto.getParentId();
+ }
+ public boolean hasParent() {
+ return proto.hasParentId();
+ }
+ public boolean isReady() {
+ return ready;
+ }
+ public boolean isFinished() {
+ if (!hasParent()) {
+ // we only consider 'root' procedures. because for the user 'finished'
+ // means when everything up to the 'root' is finished.
+ switch (proto.getState()) {
+ case SUCCESS:
+ return true;
+ default:
+ break;
+ }
+ }
+ return false;
+ }
+ public Procedure> convert() throws IOException {
+ if (procedure == null) {
+ procedure = ProcedureUtil.convertToProcedure(proto);
+ }
+ return procedure;
+ }
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Entry(");
+ sb.append(getProcId());
+ sb.append(", parentId=");
+ sb.append(getParentId());
+ sb.append(", class=");
+ sb.append(proto.getClassName());
+ sb.append(")");
+ return sb.toString();
+ }
+ }
+ private static class EntryIterator implements ProcedureIterator {
+ private final Entry replayHead;
+ private Entry current;
+ public EntryIterator(Entry replayHead) {
+ this.replayHead = replayHead;
+ this.current = replayHead;
+ }
+ @Override
+ public void reset() {
+ this.current = replayHead;
+ }
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+ @Override
+ public boolean isNextFinished() {
+ return current != null && current.isFinished();
+ }
+ @Override
+ public void skipNext() {
+ current = current.replayNext;
+ }
+ @Override
+ public Procedure> next() throws IOException {
+ try {
+ return current.convert();
+ } finally {
+ current = current.replayNext;
+ }
+ }
+ }
+ // procedure hash table
+ private Entry[] procedureMap;
+ // replay-order double-linked-list
+ private Entry replayOrderHead;
+ private Entry replayOrderTail;
+ // root linked-list
+ private Entry rootHead;
+ // pending unlinked children (root not present yet)
+ private Entry childUnlinkedHead;
+ // Track ProcId range
+ private long minModifiedProcId = Long.MAX_VALUE;
+ private long maxModifiedProcId = Long.MIN_VALUE;
+ public WALProcedureMap(int size) {
+ procedureMap = new Entry[size];
+ replayOrderHead = null;
+ replayOrderTail = null;
+ rootHead = null;
+ childUnlinkedHead = null;
+ }
+ public void add(ProcedureProtos.Procedure procProto) {
+ trackProcIds(procProto.getProcId());
+ Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
+ boolean newEntry = entry.proto == null;
+ // We have seen procedure WALs where the entries are out of order; see HBASE-18152.
+ // To compensate, only replace the Entry procedure if for sure this new procedure
+ // is indeed an entry that came later.
+ // TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs
+ // contain procedure changes goingfrom start to finish in sequence.
+ if (newEntry || isIncreasing(entry.proto, procProto)) {
+ entry.proto = procProto;
+ }
+ addToReplayList(entry);
+ if (newEntry) {
+ if (procProto.hasParentId()) {
+ childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
+ } else {
+ rootHead = addToLinkList(entry, rootHead);
+ }
+ }
+ }
+ /**
+ * @return True if this new procedure is 'richer' than the current one else false and we log this
+ * incidence where it appears that the WAL has older entries appended after newer ones.
+ * See HBASE-18152.
+ */
+ private static boolean isIncreasing(ProcedureProtos.Procedure current,
+ ProcedureProtos.Procedure candidate) {
+ // Check that the procedures we see are 'increasing'. We used to compare
+ // procedure id first and then update time but it can legitimately go backwards if the
+ // procedure is failed or rolled back so that was unreliable. Was going to compare
+ // state but lets see if comparing update time enough (unfortunately this issue only
+ // seen under load...)
+ boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate();
+ if (!increasing) {
+ LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate);
+ }
+ return increasing;
+ }
+ public boolean remove(long procId) {
+ trackProcIds(procId);
+ Entry entry = removeFromMap(procId);
+ if (entry != null) {
+ unlinkFromReplayList(entry);
+ unlinkFromLinkList(entry);
+ return true;
+ }
+ return false;
+ }
+ private void trackProcIds(long procId) {
+ minModifiedProcId = Math.min(minModifiedProcId, procId);
+ maxModifiedProcId = Math.max(maxModifiedProcId, procId);
+ }
+ public long getMinModifiedProcId() {
+ return minModifiedProcId;
+ }
+ public long getMaxModifiedProcId() {
+ return maxModifiedProcId;
+ }
+ public boolean contains(long procId) {
+ return getProcedure(procId) != null;
+ }
+ public boolean isEmpty() {
+ return replayOrderHead == null;
+ }
+ public void clear() {
+ for (int i = 0; i < procedureMap.length; ++i) {
+ procedureMap[i] = null;
+ }
+ replayOrderHead = null;
+ replayOrderTail = null;
+ rootHead = null;
+ childUnlinkedHead = null;
+ minModifiedProcId = Long.MAX_VALUE;
+ maxModifiedProcId = Long.MIN_VALUE;
+ }
+ /*
+ * Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. -
+ * The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures
+ * that already exist in the "global" map (the one we are merging the "local" in to). - The
+ * replayOrderList of the "local" nao will be appended to the "global" map replay list. - The
+ * "local" map will be cleared at the end of the operation.
+ */
+ public void mergeTail(WALProcedureMap other) {
+ for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
+ int slotIndex = getMapSlot(p.getProcId());
+ p.hashNext = procedureMap[slotIndex];
+ procedureMap[slotIndex] = p;
+ }
+ if (replayOrderHead == null) {
+ replayOrderHead = other.replayOrderHead;
+ replayOrderTail = other.replayOrderTail;
+ rootHead = other.rootHead;
+ childUnlinkedHead = other.childUnlinkedHead;
+ } else {
+ // append replay list
+ assert replayOrderTail.replayNext == null;
+ assert other.replayOrderHead.replayPrev == null;
+ replayOrderTail.replayNext = other.replayOrderHead;
+ other.replayOrderHead.replayPrev = replayOrderTail;
+ replayOrderTail = other.replayOrderTail;
+ // merge rootHead
+ if (rootHead == null) {
+ rootHead = other.rootHead;
+ } else if (other.rootHead != null) {
+ Entry otherTail = findLinkListTail(other.rootHead);
+ otherTail.linkNext = rootHead;
+ rootHead.linkPrev = otherTail;
+ rootHead = other.rootHead;
+ }
+ // merge childUnlinkedHead
+ if (childUnlinkedHead == null) {
+ childUnlinkedHead = other.childUnlinkedHead;
+ } else if (other.childUnlinkedHead != null) {
+ Entry otherTail = findLinkListTail(other.childUnlinkedHead);
+ otherTail.linkNext = childUnlinkedHead;
+ childUnlinkedHead.linkPrev = otherTail;
+ childUnlinkedHead = other.childUnlinkedHead;
+ }
+ }
+ maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
+ minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
+ other.clear();
+ }
+ /**
+ * Returns an EntryIterator with the list of procedures ready to be added to the executor. A
+ * Procedure is ready if its children and parent are ready.
+ */
+ public ProcedureIterator fetchReady() {
+ buildGraph();
+ Entry readyHead = null;
+ Entry readyTail = null;
+ Entry p = replayOrderHead;
+ while (p != null) {
+ Entry next = p.replayNext;
+ if (p.isReady()) {
+ unlinkFromReplayList(p);
+ if (readyTail != null) {
+ readyTail.replayNext = p;
+ p.replayPrev = readyTail;
+ } else {
+ p.replayPrev = null;
+ readyHead = p;
+ }
+ readyTail = p;
+ p.replayNext = null;
+ }
+ p = next;
+ }
+ // we need the hash-table lookups for parents, so this must be done
+ // out of the loop where we check isReadyToRun()
+ for (p = readyHead; p != null; p = p.replayNext) {
+ removeFromMap(p.getProcId());
+ unlinkFromLinkList(p);
+ }
+ return readyHead != null ? new EntryIterator(readyHead) : null;
+ }
+ /**
+ * Drain this map and return all procedures in it.
+ */
+ public ProcedureIterator fetchAll() {
+ Entry head = replayOrderHead;
+ for (Entry p = head; p != null; p = p.replayNext) {
+ removeFromMap(p.getProcId());
+ }
+ for (int i = 0; i < procedureMap.length; ++i) {
+ assert procedureMap[i] == null : "map not empty i=" + i;
+ }
+ replayOrderHead = null;
+ replayOrderTail = null;
+ childUnlinkedHead = null;
+ rootHead = null;
+ return head != null ? new EntryIterator(head) : null;
+ }
+ private void buildGraph() {
+ Entry p = childUnlinkedHead;
+ while (p != null) {
+ Entry next = p.linkNext;
+ Entry rootProc = getRootProcedure(p);
+ if (rootProc != null) {
+ rootProc.childHead = addToLinkList(p, rootProc.childHead);
+ }
+ p = next;
+ }
+ for (p = rootHead; p != null; p = p.linkNext) {
+ checkReadyToRun(p);
+ }
+ }
+ private Entry getRootProcedure(Entry entry) {
+ while (entry != null && entry.hasParent()) {
+ entry = getProcedure(entry.getParentId());
+ }
+ return entry;
+ }
+ /**
+ * (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A
+ * Procedure is ready when parent and children are ready. "ready" means that we all the
+ * information that we need in-memory.
+ *
+ * Example-1:
+ * We have two WALs, we start reading from the newest (wal-2)
+ *
+ *
+ * wal-2 | C B |
+ * wal-1 | A B C |
+ *
+ *
+ * If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If
+ * B is the only one with parent A we can start C. We have to read one more WAL before being able
+ * to start B.
+ *
+ * How do we know with the only information in B that we are not ready.
+ *
+ * - easy case, the parent is missing from the global map
+ * - more complex case we look at the Stack IDs.
+ *
+ * The Stack-IDs are added to the procedure order as an incremental index tracking how many times
+ * that procedure was executed, which is equivalent to the number of times we wrote the procedure
+ * to the WAL.
+ * In the example above:
+ *
+ *
+ * wal-2: B has stackId = [1, 2]
+ * wal-1: B has stackId = [1]
+ * wal-1: A has stackId = [0]
+ *
+ *
+ * Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap
+ * in the stackIds of B, so something was executed before.
+ *
+ * To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the
+ * parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is
+ * available.
+ *
+ * Example-2
+ *
+ *
+ * wal-2 | A | A stackIds = [0, 2]
+ * wal-1 | A B | B stackIds = [1]
+ *
+ *
+ * There is a gap between A stackIds so something was executed in between.
+ */
+ private boolean checkReadyToRun(Entry rootEntry) {
+ assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
+ if (rootEntry.isFinished()) {
+ // If the root procedure is finished, sub-procedures should be gone
+ if (rootEntry.childHead != null) {
+ LOG.error("unexpected active children for root-procedure: {}", rootEntry);
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ LOG.error("unexpected active children: {}", p);
+ }
+ }
+ assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
+ rootEntry.ready = true;
+ return true;
+ }
+ int stackIdSum = 0;
+ int maxStackId = 0;
+ for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
+ int stackId = 1 + rootEntry.proto.getStackId(i);
+ maxStackId = Math.max(maxStackId, stackId);
+ stackIdSum += stackId;
+ LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId,
+ rootEntry);
+ }
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
+ int stackId = 1 + p.proto.getStackId(i);
+ maxStackId = Math.max(maxStackId, stackId);
+ stackIdSum += stackId;
+ LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p);
+ }
+ }
+ // The cmpStackIdSum is this formula for finding the sum of a series of numbers:
+ // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
+ final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
+ if (cmpStackIdSum == stackIdSum) {
+ rootEntry.ready = true;
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ p.ready = true;
+ }
+ return true;
+ }
+ return false;
+ }
+ private void unlinkFromReplayList(Entry entry) {
+ if (replayOrderHead == entry) {
+ replayOrderHead = entry.replayNext;
+ }
+ if (replayOrderTail == entry) {
+ replayOrderTail = entry.replayPrev;
+ }
+ if (entry.replayPrev != null) {
+ entry.replayPrev.replayNext = entry.replayNext;
+ }
+ if (entry.replayNext != null) {
+ entry.replayNext.replayPrev = entry.replayPrev;
+ }
+ }
+ private void addToReplayList(final Entry entry) {
+ unlinkFromReplayList(entry);
+ entry.replayNext = replayOrderHead;
+ entry.replayPrev = null;
+ if (replayOrderHead != null) {
+ replayOrderHead.replayPrev = entry;
+ } else {
+ replayOrderTail = entry;
+ }
+ replayOrderHead = entry;
+ }
+ private void unlinkFromLinkList(Entry entry) {
+ if (entry == rootHead) {
+ rootHead = entry.linkNext;
+ } else if (entry == childUnlinkedHead) {
+ childUnlinkedHead = entry.linkNext;
+ }
+ if (entry.linkPrev != null) {
+ entry.linkPrev.linkNext = entry.linkNext;
+ }
+ if (entry.linkNext != null) {
+ entry.linkNext.linkPrev = entry.linkPrev;
+ }
+ }
+ private Entry addToLinkList(Entry entry, Entry linkHead) {
+ unlinkFromLinkList(entry);
+ entry.linkNext = linkHead;
+ entry.linkPrev = null;
+ if (linkHead != null) {
+ linkHead.linkPrev = entry;
+ }
+ return entry;
+ }
+ private Entry findLinkListTail(Entry linkHead) {
+ Entry tail = linkHead;
+ while (tail.linkNext != null) {
+ tail = tail.linkNext;
+ }
+ return tail;
+ }
+ private Entry addToMap(long procId, boolean hasParent) {
+ int slotIndex = getMapSlot(procId);
+ Entry entry = getProcedure(slotIndex, procId);
+ if (entry != null) {
+ return entry;
+ }
+ entry = new Entry(procedureMap[slotIndex]);
+ procedureMap[slotIndex] = entry;
+ return entry;
+ }
+ private Entry removeFromMap(final long procId) {
+ int slotIndex = getMapSlot(procId);
+ Entry prev = null;
+ Entry entry = procedureMap[slotIndex];
+ while (entry != null) {
+ if (procId == entry.getProcId()) {
+ if (prev != null) {
+ prev.hashNext = entry.hashNext;
+ } else {
+ procedureMap[slotIndex] = entry.hashNext;
+ }
+ entry.hashNext = null;
+ return entry;
+ }
+ prev = entry;
+ entry = entry.hashNext;
+ }
+ return null;
+ }
+ private Entry getProcedure(long procId) {
+ return getProcedure(getMapSlot(procId), procId);
+ }
+ private Entry getProcedure(int slotIndex, long procId) {
+ Entry entry = procedureMap[slotIndex];
+ while (entry != null) {
+ if (procId == entry.getProcId()) {
+ return entry;
+ }
+ entry = entry.hashNext;
+ }
+ return null;
+ }
+ private int getMapSlot(long procId) {
+ return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length);
+ }
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 951f05e8357..b3f5d10f83f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -15,14 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
@@ -35,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -52,25 +49,60 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
* WAL implementation of the ProcedureStore.
+ *
+ * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()},
+ * then {@link #load(ProcedureLoader)}.
+ *
+ * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by
+ * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
+ * the old wal files.
+ *
+ * FIXME: notice that the current recover lease implementation is problematic, it can not deal with
+ * the races if there are two master both wants to acquire the lease...
+ *
+ * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the
+ * comments of this method for more details.
+ *
+ * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
+ * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete
+ * methods we will put thing into the {@link #slots} and wait. And there is a background sync
+ * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them
+ * to the FileSystem, and notify the caller that we have finished.
+ *
+ * TODO: try using disruptor to increase performance and simplify the logic?
+ *
+ * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is
+ * also the one being written currently. And the deleted bits in it are for all the procedures, not
+ * only the ones in the newest wal file. And when rolling a log, we will first store it in the
+ * trailer of the current wal file, and then reset its modified bits, so that it can start to track
+ * the modified procedures for the new wal file.
+ *
+ * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal
+ * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
+ * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
+ * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
+ * with the tracker of every newer wal files, using the
+ * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out
+ * that all the modified procedures for the oldest wal file are modified or deleted in newer wal
+ * files, then we can delete it.
* @see ProcedureWALPrettyPrinter for printing content of a single WAL.
* @see #main(String[]) to parse a directory of MasterWALProcs.
public class WALProcedureStore extends ProcedureStoreBase {
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
public static final String LOG_PREFIX = "pv2-";
@@ -166,7 +198,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private int syncWaitMsec;
// Variables used for UI display
- private CircularFifoQueue syncMetricsQueue;
+ private CircularFifoQueue syncMetricsQueue;
public static class SyncMetrics {
private long timestamp;
@@ -228,11 +260,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Create archive dir up front. Rename won't work w/o it up on HDFS.
if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
if (this.fs.mkdirs(this.walArchiveDir)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir);
- }
+ LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir);
} else {
- LOG.warn("Failed create of " + this.walArchiveDir);
+ LOG.warn("Failed create of {}", this.walArchiveDir);
@@ -248,7 +278,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
runningProcCount = numSlots;
syncMaxSlot = numSlots;
slots = new ByteSlot[numSlots];
- slotsCache = new LinkedTransferQueue();
+ slotsCache = new LinkedTransferQueue<>();
while (slotsCache.size() < numSlots) {
slotsCache.offer(new ByteSlot());
@@ -267,7 +297,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
// WebUI
- syncMetricsQueue = new CircularFifoQueue(
+ syncMetricsQueue = new CircularFifoQueue<>(
// Init sync thread
@@ -394,9 +424,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// We have the lease on the log
oldLogs = getLogFiles();
if (getMaxLogId(oldLogs) > flushLogId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
- }
+ LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId);
@@ -410,7 +438,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
- public void load(final ProcedureLoader loader) throws IOException {
+ public void load(ProcedureLoader loader) throws IOException {
try {
if (logs.isEmpty()) {
@@ -425,7 +453,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Load the old logs
- final Iterator it = logs.descendingIterator();
+ Iterator it = logs.descendingIterator();
it.next(); // Skip the current log
ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
@@ -485,7 +513,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
- public void insert(final Procedure proc, final Procedure[] subprocs) {
+ public void insert(Procedure> proc, Procedure>[] subprocs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
@@ -519,7 +547,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
- public void insert(final Procedure[] procs) {
+ public void insert(Procedure>[] procs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Insert " + Arrays.toString(procs));
@@ -548,7 +576,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
- public void update(final Procedure proc) {
+ public void update(Procedure> proc) {
if (LOG.isTraceEnabled()) {
LOG.trace("Update " + proc);
@@ -571,11 +599,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
- public void delete(final long procId) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Delete " + procId);
- }
+ public void delete(long procId) {
+ LOG.trace("Delete {}", procId);
ByteSlot slot = acquireSlot();
try {
// Serialize the delete
@@ -594,7 +619,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
- public void delete(final Procedure proc, final long[] subProcIds) {
+ public void delete(Procedure> proc, long[] subProcIds) {
assert proc != null : "expected a non-null procedure";
assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
if (LOG.isTraceEnabled()) {
@@ -630,7 +655,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
- private void delete(final long[] procIds) {
+ private void delete(long[] procIds) {
if (LOG.isTraceEnabled()) {
LOG.trace("Delete " + Arrays.toString(procIds));
@@ -736,20 +761,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
} else {
storeTracker.insert(procId, subProcIds);
- holdingCleanupTracker.setDeletedIfSet(procId);
+ holdingCleanupTracker.setDeletedIfModified(procId);
case UPDATE:
- holdingCleanupTracker.setDeletedIfSet(procId);
+ holdingCleanupTracker.setDeletedIfModified(procId);
case DELETE:
if (subProcIds != null && subProcIds.length > 0) {
- holdingCleanupTracker.setDeletedIfSet(subProcIds);
+ holdingCleanupTracker.setDeletedIfModified(subProcIds);
} else {
- holdingCleanupTracker.setDeletedIfSet(procId);
+ holdingCleanupTracker.setDeletedIfModified(procId);
@@ -973,16 +998,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
private void periodicRoll() throws IOException {
if (storeTracker.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("no active procedures");
- }
+ LOG.trace("no active procedures");
removeAllLogs(flushLogId - 1);
} else {
- if (storeTracker.isUpdated()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("all the active procedures are in the latest log");
- }
+ if (storeTracker.isAllModified()) {
+ LOG.trace("all the active procedures are in the latest log");
removeAllLogs(flushLogId - 1);
@@ -997,18 +1018,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
private boolean rollWriter() throws IOException {
- if (!isRunning()) return false;
+ if (!isRunning()) {
+ return false;
+ }
// Create new state-log
if (!rollWriter(flushLogId + 1)) {
- LOG.warn("someone else has already created log " + flushLogId);
+ LOG.warn("someone else has already created log {}", flushLogId);
return false;
// We have the lease on the log,
// but we should check if someone else has created new files
if (getMaxLogId(getLogFiles()) > flushLogId) {
- LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
+ LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
return false;
@@ -1064,7 +1087,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
- storeTracker.resetUpdates();
+ storeTracker.resetModified();
stream = newStream;
flushLogId = logId;
@@ -1092,12 +1115,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
ProcedureWALFile log = logs.getLast();
- log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
+ log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
} catch (IOException e) {
- LOG.warn("Unable to write the trailer: " + e.getMessage());
+ LOG.warn("Unable to write the trailer", e);
try {
@@ -1134,9 +1157,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
// - the other WALs are scanned to remove procs already in other wals.
// TODO: exit early if holdingCleanupTracker.isEmpty()
holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
- holdingCleanupTracker.setDeletedIfSet(storeTracker);
+ holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker);
for (int i = 1, size = logs.size() - 1; i < size; ++i) {
- holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker());
+ holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker());
@@ -1144,12 +1167,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
* Remove all logs with logId <= {@code lastLogId}.
private void removeAllLogs(long lastLogId) {
- if (logs.size() <= 1) return;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Remove all state logs with ID less than " + lastLogId);
+ if (logs.size() <= 1) {
+ return;
+ LOG.trace("Remove all state logs with ID less than {}", lastLogId);
boolean removed = false;
while (logs.size() > 1) {
ProcedureWALFile log = logs.getFirst();
@@ -1167,14 +1190,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Removing log=" + log);
- }
+ LOG.trace("Removing log={}", log);
- if (LOG.isDebugEnabled()) {
- LOG.info("Removed log=" + log + ", activeLogs=" + logs);
- }
+ LOG.debug("Removed log={}, activeLogs={}", log, logs);
assert logs.size() > 0 : "expected at least one log";
} catch (IOException e) {
LOG.error("Unable to remove log: " + log, e);
@@ -1238,50 +1257,53 @@ public class WALProcedureStore extends ProcedureStoreBase {
- private static long getMaxLogId(final FileStatus[] logFiles) {
- long maxLogId = 0;
- if (logFiles != null && logFiles.length > 0) {
- for (int i = 0; i < logFiles.length; ++i) {
- maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
- }
- }
- return maxLogId;
- }
+ * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
+ * the file set by log id.
* @return Max-LogID of the specified log file set
- private long initOldLogs(final FileStatus[] logFiles) throws IOException {
- this.logs.clear();
- long maxLogId = 0;
- if (logFiles != null && logFiles.length > 0) {
- for (int i = 0; i < logFiles.length; ++i) {
- final Path logPath = logFiles[i].getPath();
- leaseRecovery.recoverFileLease(fs, logPath);
- if (!isRunning()) {
- throw new IOException("wal aborting");
- }
- maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
- ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
- if (log != null) {
- this.logs.add(log);
- }
- }
- Collections.sort(this.logs);
- initTrackerFromOldLogs();
+ private static long getMaxLogId(FileStatus[] logFiles) {
+ if (logFiles == null || logFiles.length == 0) {
+ return 0L;
+ return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName());
+ }
+ /**
+ * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
+ * the file set by log id.
+ * @return Max-LogID of the specified log file set
+ */
+ private long initOldLogs(FileStatus[] logFiles) throws IOException {
+ if (logFiles == null || logFiles.length == 0) {
+ return 0L;
+ }
+ long maxLogId = 0;
+ for (int i = 0; i < logFiles.length; ++i) {
+ final Path logPath = logFiles[i].getPath();
+ leaseRecovery.recoverFileLease(fs, logPath);
+ if (!isRunning()) {
+ throw new IOException("wal aborting");
+ }
+ maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
+ ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
+ if (log != null) {
+ this.logs.add(log);
+ }
+ }
+ initTrackerFromOldLogs();
return maxLogId;
- * If last log's tracker is not null, use it as {@link #storeTracker}.
- * Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild
- * it using entries in the log.
+ * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker
+ * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log.
private void initTrackerFromOldLogs() {
- if (logs.isEmpty() || !isRunning()) return;
+ if (logs.isEmpty() || !isRunning()) {
+ return;
+ }
ProcedureWALFile log = logs.getLast();
if (!log.getTracker().isPartial()) {
@@ -1295,20 +1317,18 @@ public class WALProcedureStore extends ProcedureStoreBase {
* Loads given log file and it's tracker.
private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
- throws IOException {
+ throws IOException {
final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
if (logFile.getLen() == 0) {
- LOG.warn("Remove uninitialized log: " + logFile);
+ LOG.warn("Remove uninitialized log: {}", logFile);
return null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening Pv2 " + logFile);
- }
+ LOG.debug("Opening Pv2 {}", logFile);
try {
} catch (ProcedureWALFormat.InvalidWALDataException e) {
- LOG.warn("Remove uninitialized log: " + logFile, e);
+ LOG.warn("Remove uninitialized log: {}", logFile, e);
return null;
} catch (IOException e) {
@@ -1322,7 +1342,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (IOException e) {
- LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
+ LOG.warn("Unable to read tracker for {}", log, e);
@@ -1350,7 +1370,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
- ProcedureExecutor pe = new ProcedureExecutor(conf, new Object()/*Pass anything*/, store);
+ ProcedureExecutor> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store);
pe.init(1, true);
} finally {
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index ffc6ab8de0d..d6b58d0b33f 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import java.util.Random;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.BitSetNode;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
@@ -119,29 +118,29 @@ public class TestProcedureStoreTracker {
tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] });
- assertTrue(tracker.isUpdated());
+ assertTrue(tracker.isAllModified());
- tracker.resetUpdates();
- assertFalse(tracker.isUpdated());
+ tracker.resetModified();
+ assertFalse(tracker.isAllModified());
for (int i = 0; i < 4; ++i) {
- assertFalse(tracker.isUpdated());
+ assertFalse(tracker.isAllModified());
- assertTrue(tracker.isUpdated());
+ assertTrue(tracker.isAllModified());
- assertTrue(tracker.isUpdated());
+ assertTrue(tracker.isAllModified());
for (int i = 0; i < 5; ++i) {
- assertTrue(tracker.isUpdated());
+ assertTrue(tracker.isAllModified());
@@ -235,7 +234,7 @@ public class TestProcedureStoreTracker {
for (long i : active) {
- tracker.resetUpdates();
+ tracker.resetModified();
for (long i : updated) {
@@ -252,11 +251,11 @@ public class TestProcedureStoreTracker {
BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) {
BitSetNode bitSetNode = new BitSetNode(0L, false);
for (long i : active) {
- bitSetNode.update(i);
+ bitSetNode.insertOrUpdate(i);
- bitSetNode.resetUpdates();
+ bitSetNode.resetModified();
for (long i : updated) {
- bitSetNode.update(i);
+ bitSetNode.insertOrUpdate(i);
for (long i : deleted) {
@@ -276,9 +275,9 @@ public class TestProcedureStoreTracker {
assertEquals(false, tracker.isEmpty());
for (int i = 0; i < procIds.length; ++i) {
- tracker.setDeletedIfSet(procIds[i] - 1);
- tracker.setDeletedIfSet(procIds[i]);
- tracker.setDeletedIfSet(procIds[i] + 1);
+ tracker.setDeletedIfModified(procIds[i] - 1);
+ tracker.setDeletedIfModified(procIds[i]);
+ tracker.setDeletedIfModified(procIds[i] + 1);
assertEquals(true, tracker.isEmpty());
@@ -289,7 +288,7 @@ public class TestProcedureStoreTracker {
assertEquals(false, tracker.isEmpty());
- tracker.setDeletedIfSet(procIds);
+ tracker.setDeletedIfModified(procIds);
assertEquals(true, tracker.isEmpty());
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index b1bd254b800..d682481e886 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -423,11 +423,11 @@ public class TestWALProcedureStore {
final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
for (int index : updatedProcs) {
long procId = procs[index].getProcId();
- assertTrue("Procedure id : " + procId, tracker.isUpdated(procId));
+ assertTrue("Procedure id : " + procId, tracker.isModified(procId));
for (int index : nonUpdatedProcs) {
long procId = procs[index].getProcId();
- assertFalse("Procedure id : " + procId, tracker.isUpdated(procId));
+ assertFalse("Procedure id : " + procId, tracker.isModified(procId));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index e2e4aec00b6..528f0395211 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* avoiding port contention if another local HBase instance is already running).
* To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
* setting it to true.
+ * For triggering test.