From 118b0746849c886fc64b0a53014f3186e2db4d9d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 6 Oct 2018 17:27:05 +0800 Subject: [PATCH] HBASE-21250 Refactor WALProcedureStore and add more comments for better understanding the implementation --- .../hbase/procedure2/store/BitSetNode.java | 397 ++++++++++ .../procedure2/store/NoopProcedureStore.java | 9 +- .../procedure2/store/ProcedureStore.java | 9 +- .../store/ProcedureStoreTracker.java | 502 ++----------- .../CorruptedWALProcedureStoreException.java | 6 +- .../store/wal/ProcedureWALFile.java | 7 +- .../store/wal/ProcedureWALFormat.java | 38 +- .../store/wal/ProcedureWALFormatReader.java | 707 ++---------------- .../store/wal/ProcedureWALPrettyPrinter.java | 18 +- .../procedure2/store/wal/WALProcedureMap.java | 607 +++++++++++++++ .../store/wal/WALProcedureStore.java | 218 +++--- .../store/TestProcedureStoreTracker.java | 31 +- .../store/wal/TestWALProcedureStore.java | 4 +- .../hadoop/hbase/HBaseTestingUtility.java | 1 + 14 files changed, 1328 insertions(+), 1226 deletions(-) create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java new file mode 100644 index 00000000000..b76c026d01d --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store; + +import java.util.Arrays; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). + * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range + * of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is + * BITS_PER_WORD. + *

+ * We have two main bit sets to describe the state of procedures, the meanings are: + * + *

+ *  ----------------------
+ * | modified | deleted |  meaning
+ * |     0    |   0     |  proc exists, but hasn't been updated since last resetUpdates().
+ * |     1    |   0     |  proc was updated (but not deleted).
+ * |     1    |   1     |  proc was deleted.
+ * |     0    |   1     |  proc doesn't exist (maybe never created, maybe deleted in past).
+ * ----------------------
+ * 
+ * + * The meaning of modified is that, we have modified the state of the procedure, no matter insert, + * update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we will + * set the delete to 1. + *

+ * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the + * partial one, the initial modified value is 0 and the initial deleted value is also 0. In + * {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified. + */ +@InterfaceAudience.Private +class BitSetNode { + private static final long WORD_MASK = 0xffffffffffffffffL; + private static final int ADDRESS_BITS_PER_WORD = 6; + private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; + private static final int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; + + /** + * Mimics {@link ProcedureStoreTracker#partial}. It will effect how we fill the new deleted bits + * when growing. + */ + private boolean partial; + + /** + * Set of procedures which have been modified since last {@link #resetModified()}. Useful to track + * procedures which have been modified since last WAL write. + */ + private long[] modified; + + /** + * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This + * represents global state since it's not reset on WAL rolls. + */ + private long[] deleted; + /** + * Offset of bitmap i.e. procedure id corresponding to first bit. + */ + private long start; + + public void dump() { + System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(), + getActiveMaxProcId()); + System.out.println("Modified:"); + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + System.out.println("Delete:"); + for (int i = 0; i < deleted.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + } + + public BitSetNode(long procId, boolean partial) { + start = alignDown(procId); + + int count = 1; + modified = new long[count]; + deleted = new long[count]; + if (!partial) { + Arrays.fill(deleted, WORD_MASK); + } + + this.partial = partial; + updateState(procId, false); + } + + public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { + start = data.getStartId(); + int size = data.getUpdatedCount(); + assert size == data.getDeletedCount(); + modified = new long[size]; + deleted = new long[size]; + for (int i = 0; i < size; ++i) { + modified[i] = data.getUpdated(i); + deleted[i] = data.getDeleted(i); + } + partial = false; + } + + public BitSetNode(BitSetNode other, boolean resetDelete) { + this.start = other.start; + this.partial = other.partial; + this.modified = other.modified.clone(); + // The resetDelete will be set to true when building cleanup tracker. + // The intention here is that, if a procedure is not modified in this tracker, then we do not + // need to take care of it, so we will set deleted to true for these bits, i.e, if modified is + // 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is + // deleted |= ~modified, i.e, + if (resetDelete) { + this.deleted = new long[other.deleted.length]; + for (int i = 0; i < this.deleted.length; ++i) { + this.deleted[i] |= ~(other.modified[i]); + } + } else { + this.deleted = other.deleted.clone(); + } + } + + public void insertOrUpdate(final long procId) { + updateState(procId, false); + } + + public void delete(final long procId) { + updateState(procId, true); + } + + public long getStart() { + return start; + } + + public long getEnd() { + return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1; + } + + public boolean contains(final long procId) { + return start <= procId && procId <= getEnd(); + } + + public DeleteState isDeleted(final long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= deleted.length) { + return DeleteState.MAYBE; + } + return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; + } + + public boolean isModified(long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= modified.length) { + return false; + } + return (modified[wordIndex] & (1L << bitmapIndex)) != 0; + } + + /** + * @return true, if all the procedures has been modified. + */ + public boolean isAllModified() { + // TODO: cache the value + for (int i = 0; i < modified.length; ++i) { + if ((modified[i] | deleted[i]) != WORD_MASK) { + return false; + } + } + return true; + } + + /** + * @return true, if there are no active procedures in this BitSetNode, else false. + */ + public boolean isEmpty() { + // TODO: cache the value + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] != WORD_MASK) { + return false; + } + } + return true; + } + + public void resetModified() { + Arrays.fill(modified, 0); + } + + public void unsetPartialFlag() { + partial = false; + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((modified[i] & (1L << j)) == 0) { + deleted[i] |= (1L << j); + } + } + } + } + + /** + * Convert to + * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode + * protobuf. + */ + public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { + ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = + ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); + builder.setStartId(start); + for (int i = 0; i < modified.length; ++i) { + builder.addUpdated(modified[i]); + builder.addDeleted(deleted[i]); + } + return builder.build(); + } + + // ======================================================================== + // Grow/Merge Helpers + // ======================================================================== + public boolean canGrow(final long procId) { + return Math.abs(procId - start) < MAX_NODE_SIZE; + } + + public boolean canMerge(final BitSetNode rightNode) { + // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. + assert start < rightNode.start; + return (rightNode.getEnd() - start) < MAX_NODE_SIZE; + } + + public void grow(final long procId) { + int delta, offset; + + if (procId < start) { + // add to head + long newStart = alignDown(procId); + delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD; + offset = delta; + start = newStart; + } else { + // Add to tail + long newEnd = alignUp(procId + 1); + delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; + offset = 0; + } + + long[] newBitmap; + int oldSize = modified.length; + + newBitmap = new long[oldSize + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = 0; + } + System.arraycopy(modified, 0, newBitmap, offset, oldSize); + modified = newBitmap; + + newBitmap = new long[deleted.length + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = partial ? 0 : WORD_MASK; + } + System.arraycopy(deleted, 0, newBitmap, offset, oldSize); + deleted = newBitmap; + } + + public void merge(final BitSetNode rightNode) { + int delta = (int) (rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; + + long[] newBitmap; + int oldSize = modified.length; + int newSize = (delta - rightNode.modified.length); + int offset = oldSize + newSize; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(modified, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.modified, 0, newBitmap, offset, rightNode.modified.length); + modified = newBitmap; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(deleted, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); + deleted = newBitmap; + + for (int i = 0; i < newSize; ++i) { + modified[offset + i] = 0; + deleted[offset + i] = partial ? 0 : WORD_MASK; + } + } + + @Override + public String toString() { + return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; + } + + // ======================================================================== + // Min/Max Helpers + // ======================================================================== + public long getActiveMinProcId() { + long minProcId = start; + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] == 0) { + return (minProcId); + } + + if (deleted[i] != WORD_MASK) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((deleted[i] & (1L << j)) != 0) { + return minProcId + j; + } + } + } + + minProcId += BITS_PER_WORD; + } + return minProcId; + } + + public long getActiveMaxProcId() { + long maxProcId = getEnd(); + for (int i = deleted.length - 1; i >= 0; --i) { + if (deleted[i] == 0) { + return maxProcId; + } + + if (deleted[i] != WORD_MASK) { + for (int j = BITS_PER_WORD - 1; j >= 0; --j) { + if ((deleted[i] & (1L << j)) == 0) { + return maxProcId - (BITS_PER_WORD - 1 - j); + } + } + } + maxProcId -= BITS_PER_WORD; + } + return maxProcId; + } + + // ======================================================================== + // Bitmap Helpers + // ======================================================================== + private int getBitmapIndex(final long procId) { + return (int) (procId - start); + } + + void updateState(long procId, boolean isDeleted) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + long value = (1L << bitmapIndex); + + modified[wordIndex] |= value; + if (isDeleted) { + deleted[wordIndex] |= value; + } else { + deleted[wordIndex] &= ~value; + } + } + + // ======================================================================== + // Helpers + // ======================================================================== + /** + * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignUp(final long x) { + return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; + } + + /** + * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignDown(final long x) { + return x & -BITS_PER_WORD; + } +} \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 9c6176d4bb8..8fbc1473ed7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; @@ -64,17 +63,17 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void insert(Procedure proc, Procedure[] subprocs) { + public void insert(Procedure proc, Procedure[] subprocs) { // no-op } @Override - public void insert(Procedure[] proc) { + public void insert(Procedure[] proc) { // no-op } @Override - public void update(Procedure proc) { + public void update(Procedure proc) { // no-op } @@ -84,7 +83,7 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void delete(Procedure proc, long[] subprocs) { + public void delete(Procedure proc, long[] subprocs) { // no-op } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 72883405d7c..8063b125ba5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -81,6 +81,7 @@ public interface ProcedureStore { * @throws IOException if there was an error fetching/deserializing the procedure * @return the next procedure in the iteration. */ + @SuppressWarnings("rawtypes") Procedure next() throws IOException; } @@ -173,7 +174,7 @@ public interface ProcedureStore { * @param proc the procedure to serialize and write to the store. * @param subprocs the newly created child of the proc. */ - void insert(Procedure proc, Procedure[] subprocs); + void insert(Procedure proc, Procedure[] subprocs); /** * Serialize a set of new procedures. @@ -182,14 +183,14 @@ public interface ProcedureStore { * * @param procs the procedures to serialize and write to the store. */ - void insert(Procedure[] procs); + void insert(Procedure[] procs); /** * The specified procedure was executed, * and the new state should be written to the store. * @param proc the procedure to serialize and write to the store. */ - void update(Procedure proc); + void update(Procedure proc); /** * The specified procId was removed from the executor, @@ -205,7 +206,7 @@ public interface ProcedureStore { * @param parentProc the parent procedure to serialize and write to the store. * @param subProcIds the IDs of the sub-procedure to remove. */ - void delete(Procedure parentProc, long[] subProcIds); + void delete(Procedure parentProc, long[] subProcIds); /** * The specified procIds were removed from the executor, diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 2dad5ac72ce..361419ab48f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -53,383 +53,14 @@ public class ProcedureStoreTracker { * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to * understand it's real use. */ - private boolean partial = false; + boolean partial = false; - private long minUpdatedProcId = Long.MAX_VALUE; - private long maxUpdatedProcId = Long.MIN_VALUE; + private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; public enum DeleteState { YES, NO, MAYBE } - /** - * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). - * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the - * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K - * is BITS_PER_WORD. - */ - public static class BitSetNode { - private final static long WORD_MASK = 0xffffffffffffffffL; - private final static int ADDRESS_BITS_PER_WORD = 6; - private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; - private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; - - /** - * Mimics {@link ProcedureStoreTracker#partial}. - */ - private final boolean partial; - - /* ---------------------- - * | updated | deleted | meaning - * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates(). - * | 1 | 0 | proc was updated (but not deleted). - * | 1 | 1 | proc was deleted. - * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past). - /* ---------------------- - */ - - /** - * Set of procedures which have been updated since last {@link #resetUpdates()}. - * Useful to track procedures which have been updated since last WAL write. - */ - private long[] updated; - /** - * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. - * This represents global state since it's not reset on WAL rolls. - */ - private long[] deleted; - /** - * Offset of bitmap i.e. procedure id corresponding to first bit. - */ - private long start; - - public void dump() { - System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), - getActiveMinProcId(), getActiveMaxProcId()); - System.out.println("Update:"); - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - System.out.println("Delete:"); - for (int i = 0; i < deleted.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - } - - public BitSetNode(final long procId, final boolean partial) { - start = alignDown(procId); - - int count = 1; - updated = new long[count]; - deleted = new long[count]; - for (int i = 0; i < count; ++i) { - updated[i] = 0; - deleted[i] = partial ? 0 : WORD_MASK; - } - - this.partial = partial; - updateState(procId, false); - } - - protected BitSetNode(final long start, final long[] updated, final long[] deleted) { - this.start = start; - this.updated = updated; - this.deleted = deleted; - this.partial = false; - } - - public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { - start = data.getStartId(); - int size = data.getUpdatedCount(); - updated = new long[size]; - deleted = new long[size]; - for (int i = 0; i < size; ++i) { - updated[i] = data.getUpdated(i); - deleted[i] = data.getDeleted(i); - } - partial = false; - } - - public BitSetNode(final BitSetNode other, final boolean resetDelete) { - this.start = other.start; - this.partial = other.partial; - this.updated = other.updated.clone(); - if (resetDelete) { - this.deleted = new long[other.deleted.length]; - for (int i = 0; i < this.deleted.length; ++i) { - this.deleted[i] = ~(other.updated[i]); - } - } else { - this.deleted = other.deleted.clone(); - } - } - - public void update(final long procId) { - updateState(procId, false); - } - - public void delete(final long procId) { - updateState(procId, true); - } - - public long getStart() { - return start; - } - - public long getEnd() { - return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; - } - - public boolean contains(final long procId) { - return start <= procId && procId <= getEnd(); - } - - public DeleteState isDeleted(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= deleted.length) { - return DeleteState.MAYBE; - } - return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; - } - - private boolean isUpdated(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= updated.length) { - return false; - } - return (updated[wordIndex] & (1L << bitmapIndex)) != 0; - } - - public boolean isUpdated() { - // TODO: cache the value - for (int i = 0; i < updated.length; ++i) { - if ((updated[i] | deleted[i]) != WORD_MASK) { - return false; - } - } - return true; - } - - /** - * @return true, if there are no active procedures in this BitSetNode, else false. - */ - public boolean isEmpty() { - // TODO: cache the value - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] != WORD_MASK) { - return false; - } - } - return true; - } - - public void resetUpdates() { - for (int i = 0; i < updated.length; ++i) { - updated[i] = 0; - } - } - - /** - * Clears the {@link #deleted} bitmaps. - */ - public void undeleteAll() { - for (int i = 0; i < updated.length; ++i) { - deleted[i] = 0; - } - } - - public void unsetPartialFlag() { - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((updated[i] & (1L << j)) == 0) { - deleted[i] |= (1L << j); - } - } - } - } - - /** - * Convert to - * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode - * protobuf. - */ - public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { - ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = - ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); - builder.setStartId(start); - for (int i = 0; i < updated.length; ++i) { - builder.addUpdated(updated[i]); - builder.addDeleted(deleted[i]); - } - return builder.build(); - } - - // ======================================================================== - // Grow/Merge Helpers - // ======================================================================== - public boolean canGrow(final long procId) { - return Math.abs(procId - start) < MAX_NODE_SIZE; - } - - public boolean canMerge(final BitSetNode rightNode) { - // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. - assert start < rightNode.start; - return (rightNode.getEnd() - start) < MAX_NODE_SIZE; - } - - public void grow(final long procId) { - int delta, offset; - - if (procId < start) { - // add to head - long newStart = alignDown(procId); - delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD; - offset = delta; - start = newStart; - } else { - // Add to tail - long newEnd = alignUp(procId + 1); - delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; - offset = 0; - } - - long[] newBitmap; - int oldSize = updated.length; - - newBitmap = new long[oldSize + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = 0; - } - System.arraycopy(updated, 0, newBitmap, offset, oldSize); - updated = newBitmap; - - newBitmap = new long[deleted.length + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = partial ? 0 : WORD_MASK; - } - System.arraycopy(deleted, 0, newBitmap, offset, oldSize); - deleted = newBitmap; - } - - public void merge(final BitSetNode rightNode) { - int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; - - long[] newBitmap; - int oldSize = updated.length; - int newSize = (delta - rightNode.updated.length); - int offset = oldSize + newSize; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(updated, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length); - updated = newBitmap; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(deleted, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); - deleted = newBitmap; - - for (int i = 0; i < newSize; ++i) { - updated[offset + i] = 0; - deleted[offset + i] = partial ? 0 : WORD_MASK; - } - } - - @Override - public String toString() { - return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; - } - - // ======================================================================== - // Min/Max Helpers - // ======================================================================== - public long getActiveMinProcId() { - long minProcId = start; - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] == 0) { - return(minProcId); - } - - if (deleted[i] != WORD_MASK) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((deleted[i] & (1L << j)) != 0) { - return minProcId + j; - } - } - } - - minProcId += BITS_PER_WORD; - } - return minProcId; - } - - public long getActiveMaxProcId() { - long maxProcId = getEnd(); - for (int i = deleted.length - 1; i >= 0; --i) { - if (deleted[i] == 0) { - return maxProcId; - } - - if (deleted[i] != WORD_MASK) { - for (int j = BITS_PER_WORD - 1; j >= 0; --j) { - if ((deleted[i] & (1L << j)) == 0) { - return maxProcId - (BITS_PER_WORD - 1 - j); - } - } - } - maxProcId -= BITS_PER_WORD; - } - return maxProcId; - } - - // ======================================================================== - // Bitmap Helpers - // ======================================================================== - private int getBitmapIndex(final long procId) { - return (int)(procId - start); - } - - private void updateState(final long procId, final boolean isDeleted) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - long value = (1L << bitmapIndex); - - updated[wordIndex] |= value; - if (isDeleted) { - deleted[wordIndex] |= value; - } else { - deleted[wordIndex] &= ~value; - } - } - - - // ======================================================================== - // Helpers - // ======================================================================== - /** - * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignUp(final long x) { - return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; - } - - /** - * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignDown(final long x) { - return x & -BITS_PER_WORD; - } - } - - public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { + public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { reset(); for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) { final BitSetNode node = new BitSetNode(protoNode); @@ -440,14 +71,23 @@ public class ProcedureStoreTracker { /** * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. */ - public void resetTo(final ProcedureStoreTracker tracker) { + public void resetTo(ProcedureStoreTracker tracker) { resetTo(tracker, false); } - public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) { + /** + * Resets internal state to same as given {@code tracker}, and change the deleted flag according + * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. + *

+ * The {@code resetDelete} will be set to true when building cleanup tracker, please see the + * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the + * deleted flag if {@code resetDelete} is true. + */ + public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { + reset(); this.partial = tracker.partial; - this.minUpdatedProcId = tracker.minUpdatedProcId; - this.maxUpdatedProcId = tracker.maxUpdatedProcId; + this.minModifiedProcId = tracker.minModifiedProcId; + this.maxModifiedProcId = tracker.maxModifiedProcId; this.keepDeletes = tracker.keepDeletes; for (Map.Entry entry : tracker.map.entrySet()) { map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); @@ -458,25 +98,24 @@ public class ProcedureStoreTracker { insert(null, procId); } - public void insert(final long[] procIds) { + public void insert(long[] procIds) { for (int i = 0; i < procIds.length; ++i) { insert(procIds[i]); } } - public void insert(final long procId, final long[] subProcIds) { - BitSetNode node = null; - node = update(node, procId); + public void insert(long procId, long[] subProcIds) { + BitSetNode node = update(null, procId); for (int i = 0; i < subProcIds.length; ++i) { node = insert(node, subProcIds[i]); } } - private BitSetNode insert(BitSetNode node, final long procId) { + private BitSetNode insert(BitSetNode node, long procId) { if (node == null || !node.contains(procId)) { node = getOrCreateNode(procId); } - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -485,11 +124,11 @@ public class ProcedureStoreTracker { update(null, procId); } - private BitSetNode update(BitSetNode node, final long procId) { + private BitSetNode update(BitSetNode node, long procId) { node = lookupClosestNode(node, procId); assert node != null : "expected node to update procId=" + procId; assert node.contains(procId) : "expected procId=" + procId + " in the node"; - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -506,7 +145,7 @@ public class ProcedureStoreTracker { } } - private BitSetNode delete(BitSetNode node, final long procId) { + private BitSetNode delete(BitSetNode node, long procId) { node = lookupClosestNode(node, procId); assert node != null : "expected node to delete procId=" + procId; assert node.contains(procId) : "expected procId=" + procId + " in the node"; @@ -520,35 +159,62 @@ public class ProcedureStoreTracker { return node; } - @InterfaceAudience.Private - public void setDeleted(final long procId, final boolean isDeleted) { + /** + * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. + */ + public void setMinMaxModifiedProcIds(long min, long max) { + this.minModifiedProcId = min; + this.maxModifiedProcId = max; + } + /** + * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The + * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart + * this is not true, as we will read the wal files in reverse order so a delete may come first. + */ + public void setDeleted(long procId, boolean isDeleted) { BitSetNode node = getOrCreateNode(procId); assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; node.updateState(procId, isDeleted); trackProcIds(procId); } - public void setDeletedIfSet(final long... procId) { + /** + * Set the given bit for the procId to delete if it was modified before. + *

+ * This method is used to test whether a procedure wal file can be safely deleted, as if all the + * procedures in the given procedure wal file has been modified in the new procedure wal files, + * then we can delete it. + */ + public void setDeletedIfModified(long... procId) { BitSetNode node = null; for (int i = 0; i < procId.length; ++i) { node = lookupClosestNode(node, procId[i]); - if (node != null && node.isUpdated(procId[i])) { + if (node != null && node.isModified(procId[i])) { node.delete(procId[i]); } } } - public void setDeletedIfSet(final ProcedureStoreTracker tracker) { + /** + * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by + * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, + * then we mark it as deleted. + * @see #setDeletedIfModified(long...) + */ + public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { BitSetNode trackerNode = null; - for (BitSetNode node: map.values()) { + for (BitSetNode node : map.values()) { final long minProcId = node.getStart(); final long maxProcId = node.getEnd(); for (long procId = minProcId; procId <= maxProcId; ++procId) { - if (!node.isUpdated(procId)) continue; + if (!node.isModified(procId)) { + continue; + } trackerNode = tracker.lookupClosestNode(trackerNode, procId); - if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) { - // the procedure was removed or updated + if (trackerNode == null || !trackerNode.contains(procId) || + trackerNode.isModified(procId)) { + // the procedure was removed or modified node.delete(procId); } } @@ -568,28 +234,29 @@ public class ProcedureStoreTracker { } private void trackProcIds(long procId) { - minUpdatedProcId = Math.min(minUpdatedProcId, procId); - maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); } - public long getUpdatedMinProcId() { - return minUpdatedProcId; + public long getModifiedMinProcId() { + return minModifiedProcId; } - public long getUpdatedMaxProcId() { - return maxUpdatedProcId; + public long getModifiedMaxProcId() { + return maxModifiedProcId; } public void reset() { this.keepDeletes = false; this.partial = false; this.map.clear(); - resetUpdates(); + resetModified(); } - public boolean isUpdated(long procId) { + public boolean isModified(long procId) { final Map.Entry entry = map.floorEntry(procId); - return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId); + return entry != null && entry.getValue().contains(procId) && + entry.getValue().isModified(procId); } /** @@ -604,7 +271,7 @@ public class ProcedureStoreTracker { if (entry != null && entry.getValue().contains(procId)) { BitSetNode node = entry.getValue(); DeleteState state = node.isDeleted(procId); - return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state; + return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; } return partial ? DeleteState.MAYBE : DeleteState.YES; } @@ -656,11 +323,12 @@ public class ProcedureStoreTracker { } /** - * @return true if any procedure was updated since last call to {@link #resetUpdates()}. + * @return true if all procedure was modified or deleted since last call to + * {@link #resetModified()}. */ - public boolean isUpdated() { + public boolean isAllModified() { for (Map.Entry entry : map.entrySet()) { - if (!entry.getValue().isUpdated()) { + if (!entry.getValue().isAllModified()) { return false; } } @@ -671,21 +339,15 @@ public class ProcedureStoreTracker { * Clears the list of updated procedure ids. This doesn't affect global list of active * procedure ids. */ - public void resetUpdates() { + public void resetModified() { for (Map.Entry entry : map.entrySet()) { - entry.getValue().resetUpdates(); + entry.getValue().resetModified(); } - minUpdatedProcId = Long.MAX_VALUE; - maxUpdatedProcId = Long.MIN_VALUE; + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; } - public void undeleteAll() { - for (Map.Entry entry : map.entrySet()) { - entry.getValue().undeleteAll(); - } - } - - private BitSetNode getOrCreateNode(final long procId) { + private BitSetNode getOrCreateNode(long procId) { // If procId can fit in left node (directly or by growing it) BitSetNode leftNode = null; boolean leftCanGrow = false; @@ -760,7 +422,7 @@ public class ProcedureStoreTracker { public void dump() { System.out.println("map " + map.size()); - System.out.println("isUpdated " + isUpdated()); + System.out.println("isAllModified " + isAllModified()); System.out.println("isEmpty " + isEmpty()); for (Map.Entry entry : map.entrySet()) { entry.getValue().dump(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java index dd34896ebb3..ba4480fca7e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java @@ -15,19 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; /** * Thrown when a procedure WAL is corrupted */ @InterfaceAudience.Private -@InterfaceStability.Stable public class CorruptedWALProcedureStoreException extends HBaseIOException { + + private static final long serialVersionUID = -3407300445435898074L; + /** default constructor */ public CorruptedWALProcedureStoreException() { super(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 6226350a472..16767446e17 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -15,20 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; @@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Describes a WAL File */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class ProcedureWALFile implements Comparable { private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index ac3a52941e9..c9986edc904 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -18,25 +18,22 @@ package org.apache.hadoop.hbase.procedure2.store.wal; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Iterator; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; @@ -45,9 +42,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Helper class that contains the WAL serialization utils. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public final class ProcedureWALFormat { - private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormat.class); static final byte LOG_TYPE_STREAM = 0; static final byte LOG_TYPE_COMPACTED = 1; @@ -60,6 +55,9 @@ public final class ProcedureWALFormat { @InterfaceAudience.Private public static class InvalidWALDataException extends IOException { + + private static final long serialVersionUID = 5471733223070202196L; + public InvalidWALDataException(String s) { super(s); } @@ -75,9 +73,9 @@ public final class ProcedureWALFormat { private ProcedureWALFormat() {} - public static void load(final Iterator logs, - final ProcedureStoreTracker tracker, final Loader loader) throws IOException { - final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); + public static void load(Iterator logs, ProcedureStoreTracker tracker, + Loader loader) throws IOException { + ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); tracker.setKeepDeletes(true); try { // Ignore the last log which is current active log. @@ -93,8 +91,10 @@ public final class ProcedureWALFormat { reader.finish(); // The tracker is now updated with all the procedures read from the logs - tracker.setPartialFlag(false); - tracker.resetUpdates(); + if (tracker.isPartial()) { + tracker.setPartialFlag(false); + } + tracker.resetModified(); } finally { tracker.setKeepDeletes(false); } @@ -205,7 +205,7 @@ public final class ProcedureWALFormat { } public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, - Procedure proc, Procedure[] subprocs) throws IOException { + Procedure proc, Procedure[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(type); builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); @@ -217,17 +217,17 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeInsert(ByteSlot slot, Procedure proc) + public static void writeInsert(ByteSlot slot, Procedure proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); } - public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) + public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); } - public static void writeUpdate(ByteSlot slot, Procedure proc) + public static void writeUpdate(ByteSlot slot, Procedure proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); } @@ -240,7 +240,7 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) + public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 4ab70f18e10..1ac8e01f3ab 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -15,22 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; -import java.io.IOException; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; @@ -38,7 +34,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Helper class that loads the procedures stored in a WAL */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class ProcedureWALFormatReader { private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class); @@ -98,8 +93,8 @@ public class ProcedureWALFormatReader { // In the case above we need to read one more WAL to be able to consider // the root procedure A and all children as ready. // ============================================================================================== - private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); - private final WalProcedureMap procedureMap = new WalProcedureMap(1024); + private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024); + private final WALProcedureMap procedureMap = new WALProcedureMap(1024); private final ProcedureWALFormat.Loader loader; @@ -111,33 +106,31 @@ public class ProcedureWALFormatReader { * to rebuild the tracker. */ private final ProcedureStoreTracker tracker; - // TODO: private final boolean hasFastStartSupport; /** - * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we - * re-build the list of procedures updated in that WAL because we need it for log cleaning - * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. - * (see {@link WALProcedureStore#removeInactiveLogs()}). - * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother - * re-building it. + * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build + * the list of procedures modified in that WAL because we need it for log cleaning purposes. If + * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see + * {@link WALProcedureStore#removeInactiveLogs()}). + *

+ * Notice that, the deleted part for this tracker will not be global valid as we can only count + * the deletes in the current file, but it is not big problem as finally, the above tracker will + * have the global state of deleted, and it will also be used to build the cleanup tracker. */ private ProcedureStoreTracker localTracker; - // private long compactionLogId; private long maxProcId = 0; public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, ProcedureWALFormat.Loader loader) { this.tracker = tracker; this.loader = loader; - // we support fast-start only if we have a clean shutdown. - // this.hasFastStartSupport = !tracker.isEmpty(); } - public void read(final ProcedureWALFile log) throws IOException { - localTracker = log.getTracker().isPartial() ? log.getTracker() : null; - if (localTracker != null) { - LOG.info("Rebuilding tracker for " + log); + public void read(ProcedureWALFile log) throws IOException { + localTracker = log.getTracker(); + if (localTracker.isPartial()) { + LOG.info("Rebuilding tracker for {}", log); } long count = 0; @@ -147,7 +140,7 @@ public class ProcedureWALFormatReader { while (hasMore) { ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); if (entry == null) { - LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log); + LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log); break; } count++; @@ -178,21 +171,17 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } - if (localTracker != null) { - localTracker.setPartialFlag(false); - } if (!localProcedureMap.isEmpty()) { - log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); + log.setProcIds(localProcedureMap.getMinModifiedProcId(), + localProcedureMap.getMaxModifiedProcId()); + if (localTracker.isPartial()) { + localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), + localProcedureMap.getMaxModifiedProcId()); + } procedureMap.mergeTail(localProcedureMap); - - //if (hasFastStartSupport) { - // TODO: Some procedure may be already runnables (see readInitEntry()) - // (we can also check the "update map" in the log trackers) - // -------------------------------------------------- - //EntryIterator iter = procedureMap.fetchReady(); - //if (iter != null) loader.load(iter); - // -------------------------------------------------- - //} + } + if (localTracker.isPartial()) { + localTracker.setPartialFlag(false); } } @@ -202,37 +191,46 @@ public class ProcedureWALFormatReader { // fetch the procedure ready to run. ProcedureIterator procIter = procedureMap.fetchReady(); - if (procIter != null) loader.load(procIter); + if (procIter != null) { + loader.load(procIter); + } // remaining procedures have missing link or dependencies // consider them as corrupted, manual fix is probably required. procIter = procedureMap.fetchAll(); - if (procIter != null) loader.handleCorrupted(procIter); + if (procIter != null) { + loader.handleCorrupted(procIter); + } } - private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { + private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { + if (tracker.isPartial()) { + tracker.setDeleted(procId, true); + } + } + + private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) { + if (tracker.isPartial()) { + tracker.insert(proc.getProcId()); + } + } + + private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) { maxProcId = Math.max(maxProcId, proc.getProcId()); if (isRequired(proc.getProcId())) { - if (LOG.isTraceEnabled()) { - LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId()); - } + LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId()); localProcedureMap.add(proc); - if (tracker.isPartial()) { - tracker.insert(proc.getProcId()); - } - } - if (localTracker != null) { - localTracker.insert(proc.getProcId()); + insertIfPartial(tracker, proc); } + insertIfPartial(localTracker, proc); } - private void readInitEntry(final ProcedureWALEntry entry) - throws IOException { + private void readInitEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; loadProcedure(entry, entry.getProcedure(0)); } - private void readInsertEntry(final ProcedureWALEntry entry) throws IOException { + private void readInsertEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; loadProcedure(entry, entry.getProcedure(0)); for (int i = 1; i < entry.getProcedureCount(); ++i) { @@ -240,12 +238,12 @@ public class ProcedureWALFormatReader { } } - private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException { + private void readUpdateEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; loadProcedure(entry, entry.getProcedure(0)); } - private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { + private void readDeleteEntry(ProcedureWALEntry entry) { assert entry.hasProcId() : "expected ProcID"; if (entry.getChildIdCount() > 0) { @@ -267,598 +265,19 @@ public class ProcedureWALFormatReader { } private void deleteEntry(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("delete entry " + procId); - } + LOG.trace("delete entry {}", procId); maxProcId = Math.max(maxProcId, procId); localProcedureMap.remove(procId); assert !procedureMap.contains(procId); - if (tracker.isPartial()) { - tracker.setDeleted(procId, true); - } - if (localTracker != null) { - // In case there is only delete entry for this procedure in current log. - localTracker.setDeleted(procId, true); - } + setDeletedIfPartial(tracker, procId); + setDeletedIfPartial(localTracker, procId); } - private boolean isDeleted(final long procId) { + private boolean isDeleted(long procId) { return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; } - private boolean isRequired(final long procId) { + private boolean isRequired(long procId) { return !isDeleted(procId) && !procedureMap.contains(procId); } - - // ========================================================================== - // We keep an in-memory map of the procedures sorted by replay order. - // (see the details in the beginning of the file) - // _______________________________________________ - // procedureMap = | A | | E | | C | | | | | G | | | - // D B - // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G - // - // We also have a lazy grouping by "root procedure", and a list of - // unlinked procedures. If after reading all the WALs we have unlinked - // procedures it means that we had a missing WAL or a corruption. - // rootHead = A <-> D <-> G - // B E - // C - // unlinkFromLinkList = None - // ========================================================================== - private static class Entry { - // For bucketed linked lists in hash-table. - protected Entry hashNext; - // child head - protected Entry childHead; - // double-link for rootHead or childHead - protected Entry linkNext; - protected Entry linkPrev; - // replay double-linked-list - protected Entry replayNext; - protected Entry replayPrev; - // procedure-infos - protected Procedure procedure; - protected ProcedureProtos.Procedure proto; - protected boolean ready = false; - - public Entry(Entry hashNext) { - this.hashNext = hashNext; - } - - public long getProcId() { - return proto.getProcId(); - } - - public long getParentId() { - return proto.getParentId(); - } - - public boolean hasParent() { - return proto.hasParentId(); - } - - public boolean isReady() { - return ready; - } - - public boolean isFinished() { - if (!hasParent()) { - // we only consider 'root' procedures. because for the user 'finished' - // means when everything up to the 'root' is finished. - switch (proto.getState()) { - case ROLLEDBACK: - case SUCCESS: - return true; - default: - break; - } - } - return false; - } - - public Procedure convert() throws IOException { - if (procedure == null) { - procedure = ProcedureUtil.convertToProcedure(proto); - } - return procedure; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Entry("); - sb.append(getProcId()); - sb.append(", parentId="); - sb.append(getParentId()); - sb.append(", class="); - sb.append(proto.getClassName()); - sb.append(")"); - return sb.toString(); - } - } - - private static class EntryIterator implements ProcedureIterator { - private final Entry replayHead; - private Entry current; - - public EntryIterator(Entry replayHead) { - this.replayHead = replayHead; - this.current = replayHead; - } - - @Override - public void reset() { - this.current = replayHead; - } - - @Override - public boolean hasNext() { - return current != null; - } - - @Override - public boolean isNextFinished() { - return current != null && current.isFinished(); - } - - @Override - public void skipNext() { - current = current.replayNext; - } - - @Override - public Procedure next() throws IOException { - try { - return current.convert(); - } finally { - current = current.replayNext; - } - } - } - - private static class WalProcedureMap { - // procedure hash table - private Entry[] procedureMap; - - // replay-order double-linked-list - private Entry replayOrderHead; - private Entry replayOrderTail; - - // root linked-list - private Entry rootHead; - - // pending unlinked children (root not present yet) - private Entry childUnlinkedHead; - - // Track ProcId range - private long minProcId = Long.MAX_VALUE; - private long maxProcId = Long.MIN_VALUE; - - public WalProcedureMap(int size) { - procedureMap = new Entry[size]; - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - } - - public void add(ProcedureProtos.Procedure procProto) { - trackProcIds(procProto.getProcId()); - Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); - boolean newEntry = entry.proto == null; - // We have seen procedure WALs where the entries are out of order; see HBASE-18152. - // To compensate, only replace the Entry procedure if for sure this new procedure - // is indeed an entry that came later. TODO: Fix the writing of procedure info so - // it does not violate basic expectation, that WALs contain procedure changes going - // from start to finish in sequence. - if (newEntry || isIncreasing(entry.proto, procProto)) { - entry.proto = procProto; - } - addToReplayList(entry); - if(newEntry) { - if (procProto.hasParentId()) { - childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); - } else { - rootHead = addToLinkList(entry, rootHead); - } - } - } - - /** - * @return True if this new procedure is 'richer' than the current one else - * false and we log this incidence where it appears that the WAL has older entries - * appended after newer ones. See HBASE-18152. - */ - private static boolean isIncreasing(ProcedureProtos.Procedure current, - ProcedureProtos.Procedure candidate) { - // Check that the procedures we see are 'increasing'. We used to compare - // procedure id first and then update time but it can legitimately go backwards if the - // procedure is failed or rolled back so that was unreliable. Was going to compare - // state but lets see if comparing update time enough (unfortunately this issue only - // seen under load...) - boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate(); - if (!increasing) { - LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate); - } - return increasing; - } - - public boolean remove(long procId) { - trackProcIds(procId); - Entry entry = removeFromMap(procId); - if (entry != null) { - unlinkFromReplayList(entry); - unlinkFromLinkList(entry); - return true; - } - return false; - } - - private void trackProcIds(long procId) { - minProcId = Math.min(minProcId, procId); - maxProcId = Math.max(maxProcId, procId); - } - - public long getMinProcId() { - return minProcId; - } - - public long getMaxProcId() { - return maxProcId; - } - - public boolean contains(long procId) { - return getProcedure(procId) != null; - } - - public boolean isEmpty() { - return replayOrderHead == null; - } - - public void clear() { - for (int i = 0; i < procedureMap.length; ++i) { - procedureMap[i] = null; - } - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - minProcId = Long.MAX_VALUE; - maxProcId = Long.MIN_VALUE; - } - - /* - * Merges two WalProcedureMap, - * the target is the "global" map, the source is the "local" map. - * - The entries in the hashtables are guaranteed to be unique. - * On replay we don't load procedures that already exist in the "global" - * map (the one we are merging the "local" in to). - * - The replayOrderList of the "local" nao will be appended to the "global" - * map replay list. - * - The "local" map will be cleared at the end of the operation. - */ - public void mergeTail(WalProcedureMap other) { - for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { - int slotIndex = getMapSlot(p.getProcId()); - p.hashNext = procedureMap[slotIndex]; - procedureMap[slotIndex] = p; - } - - if (replayOrderHead == null) { - replayOrderHead = other.replayOrderHead; - replayOrderTail = other.replayOrderTail; - rootHead = other.rootHead; - childUnlinkedHead = other.childUnlinkedHead; - } else { - // append replay list - assert replayOrderTail.replayNext == null; - assert other.replayOrderHead.replayPrev == null; - replayOrderTail.replayNext = other.replayOrderHead; - other.replayOrderHead.replayPrev = replayOrderTail; - replayOrderTail = other.replayOrderTail; - - // merge rootHead - if (rootHead == null) { - rootHead = other.rootHead; - } else if (other.rootHead != null) { - Entry otherTail = findLinkListTail(other.rootHead); - otherTail.linkNext = rootHead; - rootHead.linkPrev = otherTail; - rootHead = other.rootHead; - } - - // merge childUnlinkedHead - if (childUnlinkedHead == null) { - childUnlinkedHead = other.childUnlinkedHead; - } else if (other.childUnlinkedHead != null) { - Entry otherTail = findLinkListTail(other.childUnlinkedHead); - otherTail.linkNext = childUnlinkedHead; - childUnlinkedHead.linkPrev = otherTail; - childUnlinkedHead = other.childUnlinkedHead; - } - } - maxProcId = Math.max(maxProcId, other.maxProcId); - minProcId = Math.max(minProcId, other.minProcId); - - other.clear(); - } - - /* - * Returns an EntryIterator with the list of procedures ready - * to be added to the executor. - * A Procedure is ready if its children and parent are ready. - */ - public EntryIterator fetchReady() { - buildGraph(); - - Entry readyHead = null; - Entry readyTail = null; - Entry p = replayOrderHead; - while (p != null) { - Entry next = p.replayNext; - if (p.isReady()) { - unlinkFromReplayList(p); - if (readyTail != null) { - readyTail.replayNext = p; - p.replayPrev = readyTail; - } else { - p.replayPrev = null; - readyHead = p; - } - readyTail = p; - p.replayNext = null; - } - p = next; - } - // we need the hash-table lookups for parents, so this must be done - // out of the loop where we check isReadyToRun() - for (p = readyHead; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - unlinkFromLinkList(p); - } - return readyHead != null ? new EntryIterator(readyHead) : null; - } - - /* - * Drain this map and return all procedures in it. - */ - public EntryIterator fetchAll() { - Entry head = replayOrderHead; - for (Entry p = head; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - } - for (int i = 0; i < procedureMap.length; ++i) { - assert procedureMap[i] == null : "map not empty i=" + i; - } - replayOrderHead = null; - replayOrderTail = null; - childUnlinkedHead = null; - rootHead = null; - return head != null ? new EntryIterator(head) : null; - } - - private void buildGraph() { - Entry p = childUnlinkedHead; - while (p != null) { - Entry next = p.linkNext; - Entry rootProc = getRootProcedure(p); - if (rootProc != null) { - rootProc.childHead = addToLinkList(p, rootProc.childHead); - } - p = next; - } - - for (p = rootHead; p != null; p = p.linkNext) { - checkReadyToRun(p); - } - } - - private Entry getRootProcedure(Entry entry) { - while (entry != null && entry.hasParent()) { - entry = getProcedure(entry.getParentId()); - } - return entry; - } - - /* - * (see the comprehensive explanation in the beginning of the file) - * A Procedure is ready when parent and children are ready. - * "ready" means that we all the information that we need in-memory. - * - * Example-1: - * We have two WALs, we start reading from the newest (wal-2) - * wal-2 | C B | - * wal-1 | A B C | - * - * If C and B don't depend on A (A is not the parent), we can start them - * before reading wal-1. If B is the only one with parent A we can start C. - * We have to read one more WAL before being able to start B. - * - * How do we know with the only information in B that we are not ready. - * - easy case, the parent is missing from the global map - * - more complex case we look at the Stack IDs. - * - * The Stack-IDs are added to the procedure order as an incremental index - * tracking how many times that procedure was executed, which is equivalent - * to the number of times we wrote the procedure to the WAL. - * In the example above: - * wal-2: B has stackId = [1, 2] - * wal-1: B has stackId = [1] - * wal-1: A has stackId = [0] - * - * Since we know that the Stack-IDs are incremental for a Procedure, - * we notice that there is a gap in the stackIds of B, so something was - * executed before. - * To identify when a Procedure is ready we do the sum of the stackIds of - * the procedure and the parent. if the stackIdSum is equal to the - * sum of {1..maxStackId} then everything we need is available. - * - * Example-2 - * wal-2 | A | A stackIds = [0, 2] - * wal-1 | A B | B stackIds = [1] - * - * There is a gap between A stackIds so something was executed in between. - */ - private boolean checkReadyToRun(Entry rootEntry) { - assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; - - if (rootEntry.isFinished()) { - // If the root procedure is finished, sub-procedures should be gone - if (rootEntry.childHead != null) { - LOG.error("unexpected active children for root-procedure: " + rootEntry); - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - LOG.error("unexpected active children: " + p); - } - } - - assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry; - rootEntry.ready = true; - return true; - } - - int stackIdSum = 0; - int maxStackId = 0; - for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { - int stackId = 1 + rootEntry.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - if (LOG.isTraceEnabled()) { - LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum + - " maxStackid=" + maxStackId + " " + rootEntry); - } - } - - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - for (int i = 0; i < p.proto.getStackIdCount(); ++i) { - int stackId = 1 + p.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - if (LOG.isTraceEnabled()) { - LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum + - " maxStackid=" + maxStackId + " " + p); - } - } - } - // The cmpStackIdSum is this formula for finding the sum of a series of numbers: - // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg - final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); - if (cmpStackIdSum == stackIdSum) { - rootEntry.ready = true; - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - p.ready = true; - } - return true; - } - return false; - } - - private void unlinkFromReplayList(Entry entry) { - if (replayOrderHead == entry) { - replayOrderHead = entry.replayNext; - } - if (replayOrderTail == entry) { - replayOrderTail = entry.replayPrev; - } - if (entry.replayPrev != null) { - entry.replayPrev.replayNext = entry.replayNext; - } - if (entry.replayNext != null) { - entry.replayNext.replayPrev = entry.replayPrev; - } - } - - private void addToReplayList(final Entry entry) { - unlinkFromReplayList(entry); - entry.replayNext = replayOrderHead; - entry.replayPrev = null; - if (replayOrderHead != null) { - replayOrderHead.replayPrev = entry; - } else { - replayOrderTail = entry; - } - replayOrderHead = entry; - } - - private void unlinkFromLinkList(Entry entry) { - if (entry == rootHead) { - rootHead = entry.linkNext; - } else if (entry == childUnlinkedHead) { - childUnlinkedHead = entry.linkNext; - } - if (entry.linkPrev != null) { - entry.linkPrev.linkNext = entry.linkNext; - } - if (entry.linkNext != null) { - entry.linkNext.linkPrev = entry.linkPrev; - } - } - - private Entry addToLinkList(Entry entry, Entry linkHead) { - unlinkFromLinkList(entry); - entry.linkNext = linkHead; - entry.linkPrev = null; - if (linkHead != null) { - linkHead.linkPrev = entry; - } - return entry; - } - - private Entry findLinkListTail(Entry linkHead) { - Entry tail = linkHead; - while (tail.linkNext != null) { - tail = tail.linkNext; - } - return tail; - } - - private Entry addToMap(final long procId, final boolean hasParent) { - int slotIndex = getMapSlot(procId); - Entry entry = getProcedure(slotIndex, procId); - if (entry != null) return entry; - - entry = new Entry(procedureMap[slotIndex]); - procedureMap[slotIndex] = entry; - return entry; - } - - private Entry removeFromMap(final long procId) { - int slotIndex = getMapSlot(procId); - Entry prev = null; - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - if (prev != null) { - prev.hashNext = entry.hashNext; - } else { - procedureMap[slotIndex] = entry.hashNext; - } - entry.hashNext = null; - return entry; - } - prev = entry; - entry = entry.hashNext; - } - return null; - } - - private Entry getProcedure(final long procId) { - return getProcedure(getMapSlot(procId), procId); - } - - private Entry getProcedure(final int slotIndex, final long procId) { - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - return entry; - } - entry = entry.hashNext; - } - return null; - } - - private int getMapSlot(final long procId) { - return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length); - } - } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java index 582db77beaf..3afcd16411c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; @@ -30,22 +29,23 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; -import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; /** * ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file @@ -164,7 +164,7 @@ public class ProcedureWALPrettyPrinter extends Configured implements Tool { final List files = new ArrayList<>(); try { - CommandLine cmd = new PosixParser().parse(options, args); + CommandLine cmd = new DefaultParser().parse(options, args); if (cmd.hasOption("f")) { files.add(new Path(cmd.getOptionValue("f"))); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java new file mode 100644 index 00000000000..18d7823cba6 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java @@ -0,0 +1,607 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * We keep an in-memory map of the procedures sorted by replay order. (see the details in the + * beginning of {@link ProcedureWALFormatReader}). + * + *

+ *      procedureMap = | A |   | E |   | C |   |   |   |   | G |   |   |
+ *                       D               B
+ *      replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
+ *
+ *  We also have a lazy grouping by "root procedure", and a list of
+ *  unlinked procedures. If after reading all the WALs we have unlinked
+ *  procedures it means that we had a missing WAL or a corruption.
+ *      rootHead = A <-> D <-> G
+ *                 B     E
+ *                 C
+ *      unlinkFromLinkList = None
+ * 
+ */ +class WALProcedureMap { + + private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class); + + private static class Entry { + // For bucketed linked lists in hash-table. + private Entry hashNext; + // child head + private Entry childHead; + // double-link for rootHead or childHead + private Entry linkNext; + private Entry linkPrev; + // replay double-linked-list + private Entry replayNext; + private Entry replayPrev; + // procedure-infos + private Procedure procedure; + private ProcedureProtos.Procedure proto; + private boolean ready = false; + + public Entry(Entry hashNext) { + this.hashNext = hashNext; + } + + public long getProcId() { + return proto.getProcId(); + } + + public long getParentId() { + return proto.getParentId(); + } + + public boolean hasParent() { + return proto.hasParentId(); + } + + public boolean isReady() { + return ready; + } + + public boolean isFinished() { + if (!hasParent()) { + // we only consider 'root' procedures. because for the user 'finished' + // means when everything up to the 'root' is finished. + switch (proto.getState()) { + case ROLLEDBACK: + case SUCCESS: + return true; + default: + break; + } + } + return false; + } + + public Procedure convert() throws IOException { + if (procedure == null) { + procedure = ProcedureUtil.convertToProcedure(proto); + } + return procedure; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Entry("); + sb.append(getProcId()); + sb.append(", parentId="); + sb.append(getParentId()); + sb.append(", class="); + sb.append(proto.getClassName()); + sb.append(")"); + return sb.toString(); + } + } + + private static class EntryIterator implements ProcedureIterator { + private final Entry replayHead; + private Entry current; + + public EntryIterator(Entry replayHead) { + this.replayHead = replayHead; + this.current = replayHead; + } + + @Override + public void reset() { + this.current = replayHead; + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public boolean isNextFinished() { + return current != null && current.isFinished(); + } + + @Override + public void skipNext() { + current = current.replayNext; + } + + @Override + public Procedure next() throws IOException { + try { + return current.convert(); + } finally { + current = current.replayNext; + } + } + } + + // procedure hash table + private Entry[] procedureMap; + + // replay-order double-linked-list + private Entry replayOrderHead; + private Entry replayOrderTail; + + // root linked-list + private Entry rootHead; + + // pending unlinked children (root not present yet) + private Entry childUnlinkedHead; + + // Track ProcId range + private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; + + public WALProcedureMap(int size) { + procedureMap = new Entry[size]; + replayOrderHead = null; + replayOrderTail = null; + rootHead = null; + childUnlinkedHead = null; + } + + public void add(ProcedureProtos.Procedure procProto) { + trackProcIds(procProto.getProcId()); + Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); + boolean newEntry = entry.proto == null; + // We have seen procedure WALs where the entries are out of order; see HBASE-18152. + // To compensate, only replace the Entry procedure if for sure this new procedure + // is indeed an entry that came later. + // TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs + // contain procedure changes goingfrom start to finish in sequence. + if (newEntry || isIncreasing(entry.proto, procProto)) { + entry.proto = procProto; + } + addToReplayList(entry); + if (newEntry) { + if (procProto.hasParentId()) { + childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); + } else { + rootHead = addToLinkList(entry, rootHead); + } + } + } + + /** + * @return True if this new procedure is 'richer' than the current one else false and we log this + * incidence where it appears that the WAL has older entries appended after newer ones. + * See HBASE-18152. + */ + private static boolean isIncreasing(ProcedureProtos.Procedure current, + ProcedureProtos.Procedure candidate) { + // Check that the procedures we see are 'increasing'. We used to compare + // procedure id first and then update time but it can legitimately go backwards if the + // procedure is failed or rolled back so that was unreliable. Was going to compare + // state but lets see if comparing update time enough (unfortunately this issue only + // seen under load...) + boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate(); + if (!increasing) { + LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate); + } + return increasing; + } + + public boolean remove(long procId) { + trackProcIds(procId); + Entry entry = removeFromMap(procId); + if (entry != null) { + unlinkFromReplayList(entry); + unlinkFromLinkList(entry); + return true; + } + return false; + } + + private void trackProcIds(long procId) { + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); + } + + public long getMinModifiedProcId() { + return minModifiedProcId; + } + + public long getMaxModifiedProcId() { + return maxModifiedProcId; + } + + public boolean contains(long procId) { + return getProcedure(procId) != null; + } + + public boolean isEmpty() { + return replayOrderHead == null; + } + + public void clear() { + for (int i = 0; i < procedureMap.length; ++i) { + procedureMap[i] = null; + } + replayOrderHead = null; + replayOrderTail = null; + rootHead = null; + childUnlinkedHead = null; + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; + } + + /* + * Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. - + * The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures + * that already exist in the "global" map (the one we are merging the "local" in to). - The + * replayOrderList of the "local" nao will be appended to the "global" map replay list. - The + * "local" map will be cleared at the end of the operation. + */ + public void mergeTail(WALProcedureMap other) { + for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { + int slotIndex = getMapSlot(p.getProcId()); + p.hashNext = procedureMap[slotIndex]; + procedureMap[slotIndex] = p; + } + + if (replayOrderHead == null) { + replayOrderHead = other.replayOrderHead; + replayOrderTail = other.replayOrderTail; + rootHead = other.rootHead; + childUnlinkedHead = other.childUnlinkedHead; + } else { + // append replay list + assert replayOrderTail.replayNext == null; + assert other.replayOrderHead.replayPrev == null; + replayOrderTail.replayNext = other.replayOrderHead; + other.replayOrderHead.replayPrev = replayOrderTail; + replayOrderTail = other.replayOrderTail; + + // merge rootHead + if (rootHead == null) { + rootHead = other.rootHead; + } else if (other.rootHead != null) { + Entry otherTail = findLinkListTail(other.rootHead); + otherTail.linkNext = rootHead; + rootHead.linkPrev = otherTail; + rootHead = other.rootHead; + } + + // merge childUnlinkedHead + if (childUnlinkedHead == null) { + childUnlinkedHead = other.childUnlinkedHead; + } else if (other.childUnlinkedHead != null) { + Entry otherTail = findLinkListTail(other.childUnlinkedHead); + otherTail.linkNext = childUnlinkedHead; + childUnlinkedHead.linkPrev = otherTail; + childUnlinkedHead = other.childUnlinkedHead; + } + } + maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); + minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); + + other.clear(); + } + + /** + * Returns an EntryIterator with the list of procedures ready to be added to the executor. A + * Procedure is ready if its children and parent are ready. + */ + public ProcedureIterator fetchReady() { + buildGraph(); + + Entry readyHead = null; + Entry readyTail = null; + Entry p = replayOrderHead; + while (p != null) { + Entry next = p.replayNext; + if (p.isReady()) { + unlinkFromReplayList(p); + if (readyTail != null) { + readyTail.replayNext = p; + p.replayPrev = readyTail; + } else { + p.replayPrev = null; + readyHead = p; + } + readyTail = p; + p.replayNext = null; + } + p = next; + } + // we need the hash-table lookups for parents, so this must be done + // out of the loop where we check isReadyToRun() + for (p = readyHead; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + unlinkFromLinkList(p); + } + return readyHead != null ? new EntryIterator(readyHead) : null; + } + + /** + * Drain this map and return all procedures in it. + */ + public ProcedureIterator fetchAll() { + Entry head = replayOrderHead; + for (Entry p = head; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + } + for (int i = 0; i < procedureMap.length; ++i) { + assert procedureMap[i] == null : "map not empty i=" + i; + } + replayOrderHead = null; + replayOrderTail = null; + childUnlinkedHead = null; + rootHead = null; + return head != null ? new EntryIterator(head) : null; + } + + private void buildGraph() { + Entry p = childUnlinkedHead; + while (p != null) { + Entry next = p.linkNext; + Entry rootProc = getRootProcedure(p); + if (rootProc != null) { + rootProc.childHead = addToLinkList(p, rootProc.childHead); + } + p = next; + } + + for (p = rootHead; p != null; p = p.linkNext) { + checkReadyToRun(p); + } + } + + private Entry getRootProcedure(Entry entry) { + while (entry != null && entry.hasParent()) { + entry = getProcedure(entry.getParentId()); + } + return entry; + } + + /** + * (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A + * Procedure is ready when parent and children are ready. "ready" means that we all the + * information that we need in-memory. + *

+ * Example-1:
+ * We have two WALs, we start reading from the newest (wal-2) + * + *

+   *    wal-2 | C B |
+   *    wal-1 | A B C |
+   * 
+ * + * If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If + * B is the only one with parent A we can start C. We have to read one more WAL before being able + * to start B. + *

+ * How do we know with the only information in B that we are not ready. + *

+ * The Stack-IDs are added to the procedure order as an incremental index tracking how many times + * that procedure was executed, which is equivalent to the number of times we wrote the procedure + * to the WAL.
+ * In the example above: + * + *
+   *   wal-2: B has stackId = [1, 2]
+   *   wal-1: B has stackId = [1]
+   *   wal-1: A has stackId = [0]
+   * 
+ * + * Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap + * in the stackIds of B, so something was executed before. + *

+ * To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the + * parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is + * available. + *

+ * Example-2 + * + *

+   *    wal-2 | A |              A stackIds = [0, 2]
+   *    wal-1 | A B |            B stackIds = [1]
+   * 
+ * + * There is a gap between A stackIds so something was executed in between. + */ + private boolean checkReadyToRun(Entry rootEntry) { + assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; + + if (rootEntry.isFinished()) { + // If the root procedure is finished, sub-procedures should be gone + if (rootEntry.childHead != null) { + LOG.error("unexpected active children for root-procedure: {}", rootEntry); + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + LOG.error("unexpected active children: {}", p); + } + } + + assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry; + rootEntry.ready = true; + return true; + } + + int stackIdSum = 0; + int maxStackId = 0; + for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { + int stackId = 1 + rootEntry.proto.getStackId(i); + maxStackId = Math.max(maxStackId, stackId); + stackIdSum += stackId; + LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, + rootEntry); + } + + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + for (int i = 0; i < p.proto.getStackIdCount(); ++i) { + int stackId = 1 + p.proto.getStackId(i); + maxStackId = Math.max(maxStackId, stackId); + stackIdSum += stackId; + LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p); + } + } + // The cmpStackIdSum is this formula for finding the sum of a series of numbers: + // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg + final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); + if (cmpStackIdSum == stackIdSum) { + rootEntry.ready = true; + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + p.ready = true; + } + return true; + } + return false; + } + + private void unlinkFromReplayList(Entry entry) { + if (replayOrderHead == entry) { + replayOrderHead = entry.replayNext; + } + if (replayOrderTail == entry) { + replayOrderTail = entry.replayPrev; + } + if (entry.replayPrev != null) { + entry.replayPrev.replayNext = entry.replayNext; + } + if (entry.replayNext != null) { + entry.replayNext.replayPrev = entry.replayPrev; + } + } + + private void addToReplayList(final Entry entry) { + unlinkFromReplayList(entry); + entry.replayNext = replayOrderHead; + entry.replayPrev = null; + if (replayOrderHead != null) { + replayOrderHead.replayPrev = entry; + } else { + replayOrderTail = entry; + } + replayOrderHead = entry; + } + + private void unlinkFromLinkList(Entry entry) { + if (entry == rootHead) { + rootHead = entry.linkNext; + } else if (entry == childUnlinkedHead) { + childUnlinkedHead = entry.linkNext; + } + if (entry.linkPrev != null) { + entry.linkPrev.linkNext = entry.linkNext; + } + if (entry.linkNext != null) { + entry.linkNext.linkPrev = entry.linkPrev; + } + } + + private Entry addToLinkList(Entry entry, Entry linkHead) { + unlinkFromLinkList(entry); + entry.linkNext = linkHead; + entry.linkPrev = null; + if (linkHead != null) { + linkHead.linkPrev = entry; + } + return entry; + } + + private Entry findLinkListTail(Entry linkHead) { + Entry tail = linkHead; + while (tail.linkNext != null) { + tail = tail.linkNext; + } + return tail; + } + + private Entry addToMap(long procId, boolean hasParent) { + int slotIndex = getMapSlot(procId); + Entry entry = getProcedure(slotIndex, procId); + if (entry != null) { + return entry; + } + + entry = new Entry(procedureMap[slotIndex]); + procedureMap[slotIndex] = entry; + return entry; + } + + private Entry removeFromMap(final long procId) { + int slotIndex = getMapSlot(procId); + Entry prev = null; + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + if (prev != null) { + prev.hashNext = entry.hashNext; + } else { + procedureMap[slotIndex] = entry.hashNext; + } + entry.hashNext = null; + return entry; + } + prev = entry; + entry = entry.hashNext; + } + return null; + } + + private Entry getProcedure(long procId) { + return getProcedure(getMapSlot(procId), procId); + } + + private Entry getProcedure(int slotIndex, long procId) { + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + return entry; + } + entry = entry.hashNext; + } + return null; + } + + private int getMapSlot(long procId) { + return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length); + } +} \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 7d5d6d24488..00dfe85c1ca 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -15,14 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -35,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -52,25 +49,60 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; /** * WAL implementation of the ProcedureStore. + *

+ * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, + * then {@link #load(ProcedureLoader)}. + *

+ * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by + * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all + * the old wal files. + *

+ * FIXME: notice that the current recover lease implementation is problematic, it can not deal with + * the races if there are two master both wants to acquire the lease... + *

+ * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the + * comments of this method for more details. + *

+ * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is + * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete + * methods we will put thing into the {@link #slots} and wait. And there is a background sync + * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them + * to the FileSystem, and notify the caller that we have finished. + *

+ * TODO: try using disruptor to increase performance and simplify the logic? + *

+ * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is + * also the one being written currently. And the deleted bits in it are for all the procedures, not + * only the ones in the newest wal file. And when rolling a log, we will first store it in the + * trailer of the current wal file, and then reset its modified bits, so that it can start to track + * the modified procedures for the new wal file. + *

+ * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal + * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It + * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the + * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it + * with the tracker of every newer wal files, using the + * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out + * that all the modified procedures for the oldest wal file are modified or deleted in newer wal + * files, then we can delete it. * @see ProcedureWALPrettyPrinter for printing content of a single WAL. * @see #main(String[]) to parse a directory of MasterWALProcs. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class WALProcedureStore extends ProcedureStoreBase { private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); public static final String LOG_PREFIX = "pv2-"; @@ -166,7 +198,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private int syncWaitMsec; // Variables used for UI display - private CircularFifoQueue syncMetricsQueue; + private CircularFifoQueue syncMetricsQueue; public static class SyncMetrics { private long timestamp; @@ -228,11 +260,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // Create archive dir up front. Rename won't work w/o it up on HDFS. if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { if (this.fs.mkdirs(this.walArchiveDir)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir); - } + LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); } else { - LOG.warn("Failed create of " + this.walArchiveDir); + LOG.warn("Failed create of {}", this.walArchiveDir); } } } @@ -248,7 +278,7 @@ public class WALProcedureStore extends ProcedureStoreBase { runningProcCount = numSlots; syncMaxSlot = numSlots; slots = new ByteSlot[numSlots]; - slotsCache = new LinkedTransferQueue(); + slotsCache = new LinkedTransferQueue<>(); while (slotsCache.size() < numSlots) { slotsCache.offer(new ByteSlot()); } @@ -267,7 +297,7 @@ public class WALProcedureStore extends ProcedureStoreBase { useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); // WebUI - syncMetricsQueue = new CircularFifoQueue( + syncMetricsQueue = new CircularFifoQueue<>( conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); // Init sync thread @@ -394,9 +424,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // We have the lease on the log oldLogs = getLogFiles(); if (getMaxLogId(oldLogs) > flushLogId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); - } + LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); logs.getLast().removeFile(this.walArchiveDir); continue; } @@ -410,7 +438,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void load(final ProcedureLoader loader) throws IOException { + public void load(ProcedureLoader loader) throws IOException { lock.lock(); try { if (logs.isEmpty()) { @@ -425,7 +453,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Load the old logs - final Iterator it = logs.descendingIterator(); + Iterator it = logs.descendingIterator(); it.next(); // Skip the current log ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { @@ -485,7 +513,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure proc, final Procedure[] subprocs) { + public void insert(Procedure proc, Procedure[] subprocs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); } @@ -519,7 +547,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure[] procs) { + public void insert(Procedure[] procs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + Arrays.toString(procs)); } @@ -548,7 +576,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void update(final Procedure proc) { + public void update(Procedure proc) { if (LOG.isTraceEnabled()) { LOG.trace("Update " + proc); } @@ -571,11 +599,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("Delete " + procId); - } - + public void delete(long procId) { + LOG.trace("Delete {}", procId); ByteSlot slot = acquireSlot(); try { // Serialize the delete @@ -594,7 +619,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final Procedure proc, final long[] subProcIds) { + public void delete(Procedure proc, long[] subProcIds) { assert proc != null : "expected a non-null procedure"; assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; if (LOG.isTraceEnabled()) { @@ -630,7 +655,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private void delete(final long[] procIds) { + private void delete(long[] procIds) { if (LOG.isTraceEnabled()) { LOG.trace("Delete " + Arrays.toString(procIds)); } @@ -736,20 +761,20 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.insert(subProcIds); } else { storeTracker.insert(procId, subProcIds); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; case UPDATE: storeTracker.update(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); break; case DELETE: if (subProcIds != null && subProcIds.length > 0) { storeTracker.delete(subProcIds); - holdingCleanupTracker.setDeletedIfSet(subProcIds); + holdingCleanupTracker.setDeletedIfModified(subProcIds); } else { storeTracker.delete(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; default: @@ -973,16 +998,12 @@ public class WALProcedureStore extends ProcedureStoreBase { private void periodicRoll() throws IOException { if (storeTracker.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("no active procedures"); - } + LOG.trace("no active procedures"); tryRollWriter(); removeAllLogs(flushLogId - 1); } else { - if (storeTracker.isUpdated()) { - if (LOG.isTraceEnabled()) { - LOG.trace("all the active procedures are in the latest log"); - } + if (storeTracker.isAllModified()) { + LOG.trace("all the active procedures are in the latest log"); removeAllLogs(flushLogId - 1); } @@ -997,18 +1018,20 @@ public class WALProcedureStore extends ProcedureStoreBase { } private boolean rollWriter() throws IOException { - if (!isRunning()) return false; + if (!isRunning()) { + return false; + } // Create new state-log if (!rollWriter(flushLogId + 1)) { - LOG.warn("someone else has already created log " + flushLogId); + LOG.warn("someone else has already created log {}", flushLogId); return false; } // We have the lease on the log, // but we should check if someone else has created new files if (getMaxLogId(getLogFiles()) > flushLogId) { - LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); + LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); logs.getLast().removeFile(this.walArchiveDir); return false; } @@ -1064,7 +1087,7 @@ public class WALProcedureStore extends ProcedureStoreBase { closeCurrentLogStream(); - storeTracker.resetUpdates(); + storeTracker.resetModified(); stream = newStream; flushLogId = logId; totalSynced.set(0); @@ -1092,12 +1115,12 @@ public class WALProcedureStore extends ProcedureStoreBase { try { ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); log.updateLocalTracker(storeTracker); long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); log.addToSize(trailerSize); } catch (IOException e) { - LOG.warn("Unable to write the trailer: " + e.getMessage()); + LOG.warn("Unable to write the trailer", e); } try { stream.close(); @@ -1134,9 +1157,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // - the other WALs are scanned to remove procs already in other wals. // TODO: exit early if holdingCleanupTracker.isEmpty() holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); - holdingCleanupTracker.setDeletedIfSet(storeTracker); + holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker); for (int i = 1, size = logs.size() - 1; i < size; ++i) { - holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker()); + holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker()); } } @@ -1144,12 +1167,12 @@ public class WALProcedureStore extends ProcedureStoreBase { * Remove all logs with logId <= {@code lastLogId}. */ private void removeAllLogs(long lastLogId) { - if (logs.size() <= 1) return; - - if (LOG.isTraceEnabled()) { - LOG.trace("Remove all state logs with ID less than " + lastLogId); + if (logs.size() <= 1) { + return; } + LOG.trace("Remove all state logs with ID less than {}", lastLogId); + boolean removed = false; while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); @@ -1167,14 +1190,10 @@ public class WALProcedureStore extends ProcedureStoreBase { private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { try { - if (LOG.isTraceEnabled()) { - LOG.trace("Removing log=" + log); - } + LOG.trace("Removing log={}", log); log.removeFile(walArchiveDir); logs.remove(log); - if (LOG.isDebugEnabled()) { - LOG.info("Removed log=" + log + ", activeLogs=" + logs); - } + LOG.debug("Removed log={}, activeLogs={}", log, logs); assert logs.size() > 0 : "expected at least one log"; } catch (IOException e) { LOG.error("Unable to remove log: " + log, e); @@ -1238,50 +1257,53 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private static long getMaxLogId(final FileStatus[] logFiles) { - long maxLogId = 0; - if (logFiles != null && logFiles.length > 0) { - for (int i = 0; i < logFiles.length; ++i) { - maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName())); - } - } - return maxLogId; - } - /** + * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort + * the file set by log id. * @return Max-LogID of the specified log file set */ - private long initOldLogs(final FileStatus[] logFiles) throws IOException { - this.logs.clear(); - - long maxLogId = 0; - if (logFiles != null && logFiles.length > 0) { - for (int i = 0; i < logFiles.length; ++i) { - final Path logPath = logFiles[i].getPath(); - leaseRecovery.recoverFileLease(fs, logPath); - if (!isRunning()) { - throw new IOException("wal aborting"); - } - - maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); - ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); - if (log != null) { - this.logs.add(log); - } - } - Collections.sort(this.logs); - initTrackerFromOldLogs(); + private static long getMaxLogId(FileStatus[] logFiles) { + if (logFiles == null || logFiles.length == 0) { + return 0L; } + return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); + } + + /** + * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort + * the file set by log id. + * @return Max-LogID of the specified log file set + */ + private long initOldLogs(FileStatus[] logFiles) throws IOException { + if (logFiles == null || logFiles.length == 0) { + return 0L; + } + long maxLogId = 0; + for (int i = 0; i < logFiles.length; ++i) { + final Path logPath = logFiles[i].getPath(); + leaseRecovery.recoverFileLease(fs, logPath); + if (!isRunning()) { + throw new IOException("wal aborting"); + } + + maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); + ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); + if (log != null) { + this.logs.add(log); + } + } + initTrackerFromOldLogs(); return maxLogId; } /** - * If last log's tracker is not null, use it as {@link #storeTracker}. - * Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild - * it using entries in the log. + * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker + * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. */ private void initTrackerFromOldLogs() { - if (logs.isEmpty() || !isRunning()) return; + if (logs.isEmpty() || !isRunning()) { + return; + } ProcedureWALFile log = logs.getLast(); if (!log.getTracker().isPartial()) { storeTracker.resetTo(log.getTracker()); @@ -1295,20 +1317,18 @@ public class WALProcedureStore extends ProcedureStoreBase { * Loads given log file and it's tracker. */ private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) - throws IOException { + throws IOException { final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { - LOG.warn("Remove uninitialized log: " + logFile); + LOG.warn("Remove uninitialized log: {}", logFile); log.removeFile(walArchiveDir); return null; } - if (LOG.isDebugEnabled()) { - LOG.debug("Opening Pv2 " + logFile); - } + LOG.debug("Opening Pv2 {}", logFile); try { log.open(); } catch (ProcedureWALFormat.InvalidWALDataException e) { - LOG.warn("Remove uninitialized log: " + logFile, e); + LOG.warn("Remove uninitialized log: {}", logFile, e); log.removeFile(walArchiveDir); return null; } catch (IOException e) { @@ -1322,7 +1342,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (IOException e) { log.getTracker().reset(); log.getTracker().setPartialFlag(true); - LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); + LOG.warn("Unable to read tracker for {}", log, e); } log.close(); @@ -1350,7 +1370,7 @@ public class WALProcedureStore extends ProcedureStoreBase { }); try { store.start(16); - ProcedureExecutor pe = new ProcedureExecutor(conf, new Object()/*Pass anything*/, store); + ProcedureExecutor pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store); pe.init(1, true); } finally { store.stop(true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index ffc6ab8de0d..d6b58d0b33f 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue; import java.util.Random; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.BitSetNode; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.ClassRule; @@ -119,29 +118,29 @@ public class TestProcedureStoreTracker { tracker.insert(procs[0]); tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] }); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); - tracker.resetUpdates(); - assertFalse(tracker.isUpdated()); + tracker.resetModified(); + assertFalse(tracker.isAllModified()); for (int i = 0; i < 4; ++i) { tracker.update(procs[i]); assertFalse(tracker.isEmpty()); - assertFalse(tracker.isUpdated()); + assertFalse(tracker.isAllModified()); } tracker.update(procs[4]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); tracker.update(procs[5]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); for (int i = 0; i < 5; ++i) { tracker.delete(procs[i]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); } tracker.delete(procs[5]); assertTrue(tracker.isEmpty()); @@ -235,7 +234,7 @@ public class TestProcedureStoreTracker { for (long i : active) { tracker.insert(i); } - tracker.resetUpdates(); + tracker.resetModified(); for (long i : updated) { tracker.update(i); } @@ -252,11 +251,11 @@ public class TestProcedureStoreTracker { BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) { BitSetNode bitSetNode = new BitSetNode(0L, false); for (long i : active) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } - bitSetNode.resetUpdates(); + bitSetNode.resetModified(); for (long i : updated) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } for (long i : deleted) { bitSetNode.delete(i); @@ -276,9 +275,9 @@ public class TestProcedureStoreTracker { assertEquals(false, tracker.isEmpty()); for (int i = 0; i < procIds.length; ++i) { - tracker.setDeletedIfSet(procIds[i] - 1); - tracker.setDeletedIfSet(procIds[i]); - tracker.setDeletedIfSet(procIds[i] + 1); + tracker.setDeletedIfModified(procIds[i] - 1); + tracker.setDeletedIfModified(procIds[i]); + tracker.setDeletedIfModified(procIds[i] + 1); } assertEquals(true, tracker.isEmpty()); @@ -289,7 +288,7 @@ public class TestProcedureStoreTracker { } assertEquals(false, tracker.isEmpty()); - tracker.setDeletedIfSet(procIds); + tracker.setDeletedIfModified(procIds); assertEquals(true, tracker.isEmpty()); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index b1bd254b800..d682481e886 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -423,11 +423,11 @@ public class TestWALProcedureStore { final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { for (int index : updatedProcs) { long procId = procs[index].getProcId(); - assertTrue("Procedure id : " + procId, tracker.isUpdated(procId)); + assertTrue("Procedure id : " + procId, tracker.isModified(procId)); } for (int index : nonUpdatedProcs) { long procId = procs[index].getProcId(); - assertFalse("Procedure id : " + procId, tracker.isUpdated(procId)); + assertFalse("Procedure id : " + procId, tracker.isModified(procId)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e2e4aec00b6..528f0395211 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; * avoiding port contention if another local HBase instance is already running). *

To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" * setting it to true. + * For triggering test. */ @InterfaceAudience.Public @SuppressWarnings("deprecation")