HBASE-21250 Refactor WALProcedureStore and add more comments for better understanding the implementation

This commit is contained in:
zhangduo 2018-10-06 17:27:05 +08:00
parent 9d34b4581c
commit 5a300f3fc9
14 changed files with 1326 additions and 1224 deletions

View File

@ -0,0 +1,397 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.util.Arrays;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
* Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range
* of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is
* BITS_PER_WORD.
* <p/>
* We have two main bit sets to describe the state of procedures, the meanings are:
*
* <pre>
* ----------------------
* | modified | deleted | meaning
* | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates().
* | 1 | 0 | proc was updated (but not deleted).
* | 1 | 1 | proc was deleted.
* | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past).
* ----------------------
* </pre>
*
* The meaning of modified is that, we have modified the state of the procedure, no matter insert,
* update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we will
* set the delete to 1.
* <p/>
* For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the
* partial one, the initial modified value is 0 and the initial deleted value is also 0. In
* {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified.
*/
@InterfaceAudience.Private
class BitSetNode {
private static final long WORD_MASK = 0xffffffffffffffffL;
private static final int ADDRESS_BITS_PER_WORD = 6;
private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
private static final int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
/**
* Mimics {@link ProcedureStoreTracker#partial}. It will effect how we fill the new deleted bits
* when growing.
*/
private boolean partial;
/**
* Set of procedures which have been modified since last {@link #resetModified()}. Useful to track
* procedures which have been modified since last WAL write.
*/
private long[] modified;
/**
* Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This
* represents global state since it's not reset on WAL rolls.
*/
private long[] deleted;
/**
* Offset of bitmap i.e. procedure id corresponding to first bit.
*/
private long start;
public void dump() {
System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(),
getActiveMaxProcId());
System.out.println("Modified:");
for (int i = 0; i < modified.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0");
}
System.out.println(" " + i);
}
System.out.println();
System.out.println("Delete:");
for (int i = 0; i < deleted.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
}
System.out.println(" " + i);
}
System.out.println();
}
public BitSetNode(long procId, boolean partial) {
start = alignDown(procId);
int count = 1;
modified = new long[count];
deleted = new long[count];
if (!partial) {
Arrays.fill(deleted, WORD_MASK);
}
this.partial = partial;
updateState(procId, false);
}
public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
start = data.getStartId();
int size = data.getUpdatedCount();
assert size == data.getDeletedCount();
modified = new long[size];
deleted = new long[size];
for (int i = 0; i < size; ++i) {
modified[i] = data.getUpdated(i);
deleted[i] = data.getDeleted(i);
}
partial = false;
}
public BitSetNode(BitSetNode other, boolean resetDelete) {
this.start = other.start;
this.partial = other.partial;
this.modified = other.modified.clone();
// The resetDelete will be set to true when building cleanup tracker.
// The intention here is that, if a procedure is not modified in this tracker, then we do not
// need to take care of it, so we will set deleted to true for these bits, i.e, if modified is
// 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is
// deleted |= ~modified, i.e,
if (resetDelete) {
this.deleted = new long[other.deleted.length];
for (int i = 0; i < this.deleted.length; ++i) {
this.deleted[i] |= ~(other.modified[i]);
}
} else {
this.deleted = other.deleted.clone();
}
}
public void insertOrUpdate(final long procId) {
updateState(procId, false);
}
public void delete(final long procId) {
updateState(procId, true);
}
public long getStart() {
return start;
}
public long getEnd() {
return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1;
}
public boolean contains(final long procId) {
return start <= procId && procId <= getEnd();
}
public DeleteState isDeleted(final long procId) {
int bitmapIndex = getBitmapIndex(procId);
int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
if (wordIndex >= deleted.length) {
return DeleteState.MAYBE;
}
return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
}
public boolean isModified(long procId) {
int bitmapIndex = getBitmapIndex(procId);
int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
if (wordIndex >= modified.length) {
return false;
}
return (modified[wordIndex] & (1L << bitmapIndex)) != 0;
}
/**
* @return true, if all the procedures has been modified.
*/
public boolean isAllModified() {
// TODO: cache the value
for (int i = 0; i < modified.length; ++i) {
if ((modified[i] | deleted[i]) != WORD_MASK) {
return false;
}
}
return true;
}
/**
* @return true, if there are no active procedures in this BitSetNode, else false.
*/
public boolean isEmpty() {
// TODO: cache the value
for (int i = 0; i < deleted.length; ++i) {
if (deleted[i] != WORD_MASK) {
return false;
}
}
return true;
}
public void resetModified() {
Arrays.fill(modified, 0);
}
public void unsetPartialFlag() {
partial = false;
for (int i = 0; i < modified.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
if ((modified[i] & (1L << j)) == 0) {
deleted[i] |= (1L << j);
}
}
}
}
/**
* Convert to
* org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
* protobuf.
*/
public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
builder.setStartId(start);
for (int i = 0; i < modified.length; ++i) {
builder.addUpdated(modified[i]);
builder.addDeleted(deleted[i]);
}
return builder.build();
}
// ========================================================================
// Grow/Merge Helpers
// ========================================================================
public boolean canGrow(final long procId) {
return Math.abs(procId - start) < MAX_NODE_SIZE;
}
public boolean canMerge(final BitSetNode rightNode) {
// Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
assert start < rightNode.start;
return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
}
public void grow(final long procId) {
int delta, offset;
if (procId < start) {
// add to head
long newStart = alignDown(procId);
delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD;
offset = delta;
start = newStart;
} else {
// Add to tail
long newEnd = alignUp(procId + 1);
delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
offset = 0;
}
long[] newBitmap;
int oldSize = modified.length;
newBitmap = new long[oldSize + delta];
for (int i = 0; i < newBitmap.length; ++i) {
newBitmap[i] = 0;
}
System.arraycopy(modified, 0, newBitmap, offset, oldSize);
modified = newBitmap;
newBitmap = new long[deleted.length + delta];
for (int i = 0; i < newBitmap.length; ++i) {
newBitmap[i] = partial ? 0 : WORD_MASK;
}
System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
deleted = newBitmap;
}
public void merge(final BitSetNode rightNode) {
int delta = (int) (rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
long[] newBitmap;
int oldSize = modified.length;
int newSize = (delta - rightNode.modified.length);
int offset = oldSize + newSize;
newBitmap = new long[oldSize + delta];
System.arraycopy(modified, 0, newBitmap, 0, oldSize);
System.arraycopy(rightNode.modified, 0, newBitmap, offset, rightNode.modified.length);
modified = newBitmap;
newBitmap = new long[oldSize + delta];
System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
deleted = newBitmap;
for (int i = 0; i < newSize; ++i) {
modified[offset + i] = 0;
deleted[offset + i] = partial ? 0 : WORD_MASK;
}
}
@Override
public String toString() {
return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
}
// ========================================================================
// Min/Max Helpers
// ========================================================================
public long getActiveMinProcId() {
long minProcId = start;
for (int i = 0; i < deleted.length; ++i) {
if (deleted[i] == 0) {
return (minProcId);
}
if (deleted[i] != WORD_MASK) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
if ((deleted[i] & (1L << j)) != 0) {
return minProcId + j;
}
}
}
minProcId += BITS_PER_WORD;
}
return minProcId;
}
public long getActiveMaxProcId() {
long maxProcId = getEnd();
for (int i = deleted.length - 1; i >= 0; --i) {
if (deleted[i] == 0) {
return maxProcId;
}
if (deleted[i] != WORD_MASK) {
for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
if ((deleted[i] & (1L << j)) == 0) {
return maxProcId - (BITS_PER_WORD - 1 - j);
}
}
}
maxProcId -= BITS_PER_WORD;
}
return maxProcId;
}
// ========================================================================
// Bitmap Helpers
// ========================================================================
private int getBitmapIndex(final long procId) {
return (int) (procId - start);
}
void updateState(long procId, boolean isDeleted) {
int bitmapIndex = getBitmapIndex(procId);
int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
long value = (1L << bitmapIndex);
modified[wordIndex] |= value;
if (isDeleted) {
deleted[wordIndex] |= value;
} else {
deleted[wordIndex] &= ~value;
}
}
// ========================================================================
// Helpers
// ========================================================================
/**
* @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
*/
private static long alignUp(final long x) {
return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
}
/**
* @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
*/
private static long alignDown(final long x) {
return x & -BITS_PER_WORD;
}
}

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
@ -64,17 +63,17 @@ public class NoopProcedureStore extends ProcedureStoreBase {
}
@Override
public void insert(Procedure proc, Procedure[] subprocs) {
public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
// no-op
}
@Override
public void insert(Procedure[] proc) {
public void insert(Procedure<?>[] proc) {
// no-op
}
@Override
public void update(Procedure proc) {
public void update(Procedure<?> proc) {
// no-op
}
@ -84,7 +83,7 @@ public class NoopProcedureStore extends ProcedureStoreBase {
}
@Override
public void delete(Procedure proc, long[] subprocs) {
public void delete(Procedure<?> proc, long[] subprocs) {
// no-op
}

View File

@ -81,6 +81,7 @@ public interface ProcedureStore {
* @throws IOException if there was an error fetching/deserializing the procedure
* @return the next procedure in the iteration.
*/
@SuppressWarnings("rawtypes")
Procedure next() throws IOException;
}
@ -173,7 +174,7 @@ public interface ProcedureStore {
* @param proc the procedure to serialize and write to the store.
* @param subprocs the newly created child of the proc.
*/
void insert(Procedure proc, Procedure[] subprocs);
void insert(Procedure<?> proc, Procedure<?>[] subprocs);
/**
* Serialize a set of new procedures.
@ -182,14 +183,14 @@ public interface ProcedureStore {
*
* @param procs the procedures to serialize and write to the store.
*/
void insert(Procedure[] procs);
void insert(Procedure<?>[] procs);
/**
* The specified procedure was executed,
* and the new state should be written to the store.
* @param proc the procedure to serialize and write to the store.
*/
void update(Procedure proc);
void update(Procedure<?> proc);
/**
* The specified procId was removed from the executor,
@ -205,7 +206,7 @@ public interface ProcedureStore {
* @param parentProc the parent procedure to serialize and write to the store.
* @param subProcIds the IDs of the sub-procedure to remove.
*/
void delete(Procedure parentProc, long[] subProcIds);
void delete(Procedure<?> parentProc, long[] subProcIds);
/**
* The specified procIds were removed from the executor,

View File

@ -53,383 +53,14 @@ public class ProcedureStoreTracker {
* It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
* understand it's real use.
*/
private boolean partial = false;
boolean partial = false;
private long minUpdatedProcId = Long.MAX_VALUE;
private long maxUpdatedProcId = Long.MIN_VALUE;
private long minModifiedProcId = Long.MAX_VALUE;
private long maxModifiedProcId = Long.MIN_VALUE;
public enum DeleteState { YES, NO, MAYBE }
/**
* A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
* Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the
* range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K
* is BITS_PER_WORD.
*/
public static class BitSetNode {
private final static long WORD_MASK = 0xffffffffffffffffL;
private final static int ADDRESS_BITS_PER_WORD = 6;
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
/**
* Mimics {@link ProcedureStoreTracker#partial}.
*/
private final boolean partial;
/* ----------------------
* | updated | deleted | meaning
* | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates().
* | 1 | 0 | proc was updated (but not deleted).
* | 1 | 1 | proc was deleted.
* | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past).
/* ----------------------
*/
/**
* Set of procedures which have been updated since last {@link #resetUpdates()}.
* Useful to track procedures which have been updated since last WAL write.
*/
private long[] updated;
/**
* Keeps track of procedure ids which belong to this bitmap's range and have been deleted.
* This represents global state since it's not reset on WAL rolls.
*/
private long[] deleted;
/**
* Offset of bitmap i.e. procedure id corresponding to first bit.
*/
private long start;
public void dump() {
System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
getActiveMinProcId(), getActiveMaxProcId());
System.out.println("Update:");
for (int i = 0; i < updated.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0");
}
System.out.println(" " + i);
}
System.out.println();
System.out.println("Delete:");
for (int i = 0; i < deleted.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
}
System.out.println(" " + i);
}
System.out.println();
}
public BitSetNode(final long procId, final boolean partial) {
start = alignDown(procId);
int count = 1;
updated = new long[count];
deleted = new long[count];
for (int i = 0; i < count; ++i) {
updated[i] = 0;
deleted[i] = partial ? 0 : WORD_MASK;
}
this.partial = partial;
updateState(procId, false);
}
protected BitSetNode(final long start, final long[] updated, final long[] deleted) {
this.start = start;
this.updated = updated;
this.deleted = deleted;
this.partial = false;
}
public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
start = data.getStartId();
int size = data.getUpdatedCount();
updated = new long[size];
deleted = new long[size];
for (int i = 0; i < size; ++i) {
updated[i] = data.getUpdated(i);
deleted[i] = data.getDeleted(i);
}
partial = false;
}
public BitSetNode(final BitSetNode other, final boolean resetDelete) {
this.start = other.start;
this.partial = other.partial;
this.updated = other.updated.clone();
if (resetDelete) {
this.deleted = new long[other.deleted.length];
for (int i = 0; i < this.deleted.length; ++i) {
this.deleted[i] = ~(other.updated[i]);
}
} else {
this.deleted = other.deleted.clone();
}
}
public void update(final long procId) {
updateState(procId, false);
}
public void delete(final long procId) {
updateState(procId, true);
}
public long getStart() {
return start;
}
public long getEnd() {
return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
}
public boolean contains(final long procId) {
return start <= procId && procId <= getEnd();
}
public DeleteState isDeleted(final long procId) {
int bitmapIndex = getBitmapIndex(procId);
int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
if (wordIndex >= deleted.length) {
return DeleteState.MAYBE;
}
return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
}
private boolean isUpdated(final long procId) {
int bitmapIndex = getBitmapIndex(procId);
int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
if (wordIndex >= updated.length) {
return false;
}
return (updated[wordIndex] & (1L << bitmapIndex)) != 0;
}
public boolean isUpdated() {
// TODO: cache the value
for (int i = 0; i < updated.length; ++i) {
if ((updated[i] | deleted[i]) != WORD_MASK) {
return false;
}
}
return true;
}
/**
* @return true, if there are no active procedures in this BitSetNode, else false.
*/
public boolean isEmpty() {
// TODO: cache the value
for (int i = 0; i < deleted.length; ++i) {
if (deleted[i] != WORD_MASK) {
return false;
}
}
return true;
}
public void resetUpdates() {
for (int i = 0; i < updated.length; ++i) {
updated[i] = 0;
}
}
/**
* Clears the {@link #deleted} bitmaps.
*/
public void undeleteAll() {
for (int i = 0; i < updated.length; ++i) {
deleted[i] = 0;
}
}
public void unsetPartialFlag() {
for (int i = 0; i < updated.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
if ((updated[i] & (1L << j)) == 0) {
deleted[i] |= (1L << j);
}
}
}
}
/**
* Convert to
* org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
* protobuf.
*/
public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
builder.setStartId(start);
for (int i = 0; i < updated.length; ++i) {
builder.addUpdated(updated[i]);
builder.addDeleted(deleted[i]);
}
return builder.build();
}
// ========================================================================
// Grow/Merge Helpers
// ========================================================================
public boolean canGrow(final long procId) {
return Math.abs(procId - start) < MAX_NODE_SIZE;
}
public boolean canMerge(final BitSetNode rightNode) {
// Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
assert start < rightNode.start;
return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
}
public void grow(final long procId) {
int delta, offset;
if (procId < start) {
// add to head
long newStart = alignDown(procId);
delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD;
offset = delta;
start = newStart;
} else {
// Add to tail
long newEnd = alignUp(procId + 1);
delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
offset = 0;
}
long[] newBitmap;
int oldSize = updated.length;
newBitmap = new long[oldSize + delta];
for (int i = 0; i < newBitmap.length; ++i) {
newBitmap[i] = 0;
}
System.arraycopy(updated, 0, newBitmap, offset, oldSize);
updated = newBitmap;
newBitmap = new long[deleted.length + delta];
for (int i = 0; i < newBitmap.length; ++i) {
newBitmap[i] = partial ? 0 : WORD_MASK;
}
System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
deleted = newBitmap;
}
public void merge(final BitSetNode rightNode) {
int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
long[] newBitmap;
int oldSize = updated.length;
int newSize = (delta - rightNode.updated.length);
int offset = oldSize + newSize;
newBitmap = new long[oldSize + delta];
System.arraycopy(updated, 0, newBitmap, 0, oldSize);
System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length);
updated = newBitmap;
newBitmap = new long[oldSize + delta];
System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
deleted = newBitmap;
for (int i = 0; i < newSize; ++i) {
updated[offset + i] = 0;
deleted[offset + i] = partial ? 0 : WORD_MASK;
}
}
@Override
public String toString() {
return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
}
// ========================================================================
// Min/Max Helpers
// ========================================================================
public long getActiveMinProcId() {
long minProcId = start;
for (int i = 0; i < deleted.length; ++i) {
if (deleted[i] == 0) {
return(minProcId);
}
if (deleted[i] != WORD_MASK) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
if ((deleted[i] & (1L << j)) != 0) {
return minProcId + j;
}
}
}
minProcId += BITS_PER_WORD;
}
return minProcId;
}
public long getActiveMaxProcId() {
long maxProcId = getEnd();
for (int i = deleted.length - 1; i >= 0; --i) {
if (deleted[i] == 0) {
return maxProcId;
}
if (deleted[i] != WORD_MASK) {
for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
if ((deleted[i] & (1L << j)) == 0) {
return maxProcId - (BITS_PER_WORD - 1 - j);
}
}
}
maxProcId -= BITS_PER_WORD;
}
return maxProcId;
}
// ========================================================================
// Bitmap Helpers
// ========================================================================
private int getBitmapIndex(final long procId) {
return (int)(procId - start);
}
private void updateState(final long procId, final boolean isDeleted) {
int bitmapIndex = getBitmapIndex(procId);
int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
long value = (1L << bitmapIndex);
updated[wordIndex] |= value;
if (isDeleted) {
deleted[wordIndex] |= value;
} else {
deleted[wordIndex] &= ~value;
}
}
// ========================================================================
// Helpers
// ========================================================================
/**
* @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
*/
private static long alignUp(final long x) {
return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
}
/**
* @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
*/
private static long alignDown(final long x) {
return x & -BITS_PER_WORD;
}
}
public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
reset();
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
final BitSetNode node = new BitSetNode(protoNode);
@ -440,14 +71,23 @@ public class ProcedureStoreTracker {
/**
* Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
*/
public void resetTo(final ProcedureStoreTracker tracker) {
public void resetTo(ProcedureStoreTracker tracker) {
resetTo(tracker, false);
}
public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) {
/**
* Resets internal state to same as given {@code tracker}, and change the deleted flag according
* to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
* <p/>
* The {@code resetDelete} will be set to true when building cleanup tracker, please see the
* comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
* deleted flag if {@code resetDelete} is true.
*/
public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
reset();
this.partial = tracker.partial;
this.minUpdatedProcId = tracker.minUpdatedProcId;
this.maxUpdatedProcId = tracker.maxUpdatedProcId;
this.minModifiedProcId = tracker.minModifiedProcId;
this.maxModifiedProcId = tracker.maxModifiedProcId;
this.keepDeletes = tracker.keepDeletes;
for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
@ -458,25 +98,24 @@ public class ProcedureStoreTracker {
insert(null, procId);
}
public void insert(final long[] procIds) {
public void insert(long[] procIds) {
for (int i = 0; i < procIds.length; ++i) {
insert(procIds[i]);
}
}
public void insert(final long procId, final long[] subProcIds) {
BitSetNode node = null;
node = update(node, procId);
public void insert(long procId, long[] subProcIds) {
BitSetNode node = update(null, procId);
for (int i = 0; i < subProcIds.length; ++i) {
node = insert(node, subProcIds[i]);
}
}
private BitSetNode insert(BitSetNode node, final long procId) {
private BitSetNode insert(BitSetNode node, long procId) {
if (node == null || !node.contains(procId)) {
node = getOrCreateNode(procId);
}
node.update(procId);
node.insertOrUpdate(procId);
trackProcIds(procId);
return node;
}
@ -485,11 +124,11 @@ public class ProcedureStoreTracker {
update(null, procId);
}
private BitSetNode update(BitSetNode node, final long procId) {
private BitSetNode update(BitSetNode node, long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to update procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
node.update(procId);
node.insertOrUpdate(procId);
trackProcIds(procId);
return node;
}
@ -506,7 +145,7 @@ public class ProcedureStoreTracker {
}
}
private BitSetNode delete(BitSetNode node, final long procId) {
private BitSetNode delete(BitSetNode node, long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to delete procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
@ -520,35 +159,62 @@ public class ProcedureStoreTracker {
return node;
}
@InterfaceAudience.Private
public void setDeleted(final long procId, final boolean isDeleted) {
/**
* Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
*/
public void setMinMaxModifiedProcIds(long min, long max) {
this.minModifiedProcId = min;
this.maxModifiedProcId = max;
}
/**
* This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
* {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
* this is not true, as we will read the wal files in reverse order so a delete may come first.
*/
public void setDeleted(long procId, boolean isDeleted) {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
node.updateState(procId, isDeleted);
trackProcIds(procId);
}
public void setDeletedIfSet(final long... procId) {
/**
* Set the given bit for the procId to delete if it was modified before.
* <p/>
* This method is used to test whether a procedure wal file can be safely deleted, as if all the
* procedures in the given procedure wal file has been modified in the new procedure wal files,
* then we can delete it.
*/
public void setDeletedIfModified(long... procId) {
BitSetNode node = null;
for (int i = 0; i < procId.length; ++i) {
node = lookupClosestNode(node, procId[i]);
if (node != null && node.isUpdated(procId[i])) {
if (node != null && node.isModified(procId[i])) {
node.delete(procId[i]);
}
}
}
public void setDeletedIfSet(final ProcedureStoreTracker tracker) {
/**
* Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
* the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
* then we mark it as deleted.
* @see #setDeletedIfModified(long...)
*/
public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
BitSetNode trackerNode = null;
for (BitSetNode node: map.values()) {
for (BitSetNode node : map.values()) {
final long minProcId = node.getStart();
final long maxProcId = node.getEnd();
for (long procId = minProcId; procId <= maxProcId; ++procId) {
if (!node.isUpdated(procId)) continue;
if (!node.isModified(procId)) {
continue;
}
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) {
// the procedure was removed or updated
if (trackerNode == null || !trackerNode.contains(procId) ||
trackerNode.isModified(procId)) {
// the procedure was removed or modified
node.delete(procId);
}
}
@ -568,28 +234,29 @@ public class ProcedureStoreTracker {
}
private void trackProcIds(long procId) {
minUpdatedProcId = Math.min(minUpdatedProcId, procId);
maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
minModifiedProcId = Math.min(minModifiedProcId, procId);
maxModifiedProcId = Math.max(maxModifiedProcId, procId);
}
public long getUpdatedMinProcId() {
return minUpdatedProcId;
public long getModifiedMinProcId() {
return minModifiedProcId;
}
public long getUpdatedMaxProcId() {
return maxUpdatedProcId;
public long getModifiedMaxProcId() {
return maxModifiedProcId;
}
public void reset() {
this.keepDeletes = false;
this.partial = false;
this.map.clear();
resetUpdates();
resetModified();
}
public boolean isUpdated(long procId) {
public boolean isModified(long procId) {
final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId);
return entry != null && entry.getValue().contains(procId) &&
entry.getValue().isModified(procId);
}
/**
@ -604,7 +271,7 @@ public class ProcedureStoreTracker {
if (entry != null && entry.getValue().contains(procId)) {
BitSetNode node = entry.getValue();
DeleteState state = node.isDeleted(procId);
return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
}
return partial ? DeleteState.MAYBE : DeleteState.YES;
}
@ -656,11 +323,12 @@ public class ProcedureStoreTracker {
}
/**
* @return true if any procedure was updated since last call to {@link #resetUpdates()}.
* @return true if all procedure was modified or deleted since last call to
* {@link #resetModified()}.
*/
public boolean isUpdated() {
public boolean isAllModified() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
if (!entry.getValue().isUpdated()) {
if (!entry.getValue().isAllModified()) {
return false;
}
}
@ -671,21 +339,15 @@ public class ProcedureStoreTracker {
* Clears the list of updated procedure ids. This doesn't affect global list of active
* procedure ids.
*/
public void resetUpdates() {
public void resetModified() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().resetUpdates();
entry.getValue().resetModified();
}
minUpdatedProcId = Long.MAX_VALUE;
maxUpdatedProcId = Long.MIN_VALUE;
minModifiedProcId = Long.MAX_VALUE;
maxModifiedProcId = Long.MIN_VALUE;
}
public void undeleteAll() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().undeleteAll();
}
}
private BitSetNode getOrCreateNode(final long procId) {
private BitSetNode getOrCreateNode(long procId) {
// If procId can fit in left node (directly or by growing it)
BitSetNode leftNode = null;
boolean leftCanGrow = false;
@ -760,7 +422,7 @@ public class ProcedureStoreTracker {
public void dump() {
System.out.println("map " + map.size());
System.out.println("isUpdated " + isUpdated());
System.out.println("isAllModified " + isAllModified());
System.out.println("isEmpty " + isEmpty());
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().dump();

View File

@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Thrown when a procedure WAL is corrupted
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class CorruptedWALProcedureStoreException extends HBaseIOException {
private static final long serialVersionUID = -3407300445435898074L;
/** default constructor */
public CorruptedWALProcedureStoreException() {
super();

View File

@ -15,20 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Describes a WAL File
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);

View File

@ -18,25 +18,22 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
@ -45,9 +42,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Helper class that contains the WAL serialization utils.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ProcedureWALFormat {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormat.class);
static final byte LOG_TYPE_STREAM = 0;
static final byte LOG_TYPE_COMPACTED = 1;
@ -60,6 +55,9 @@ public final class ProcedureWALFormat {
@InterfaceAudience.Private
public static class InvalidWALDataException extends IOException {
private static final long serialVersionUID = 5471733223070202196L;
public InvalidWALDataException(String s) {
super(s);
}
@ -75,9 +73,9 @@ public final class ProcedureWALFormat {
private ProcedureWALFormat() {}
public static void load(final Iterator<ProcedureWALFile> logs,
final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker,
Loader loader) throws IOException {
ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
tracker.setKeepDeletes(true);
try {
// Ignore the last log which is current active log.
@ -93,8 +91,10 @@ public final class ProcedureWALFormat {
reader.finish();
// The tracker is now updated with all the procedures read from the logs
if (tracker.isPartial()) {
tracker.setPartialFlag(false);
tracker.resetUpdates();
}
tracker.resetModified();
} finally {
tracker.setKeepDeletes(false);
}
@ -205,7 +205,7 @@ public final class ProcedureWALFormat {
}
public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
Procedure proc, Procedure[] subprocs) throws IOException {
Procedure<?> proc, Procedure<?>[] subprocs) throws IOException {
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
builder.setType(type);
builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
@ -217,17 +217,17 @@ public final class ProcedureWALFormat {
builder.build().writeDelimitedTo(slot);
}
public static void writeInsert(ByteSlot slot, Procedure proc)
public static void writeInsert(ByteSlot slot, Procedure<?> proc)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
}
public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
}
public static void writeUpdate(ByteSlot slot, Procedure proc)
public static void writeUpdate(ByteSlot slot, Procedure<?> proc)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
}
@ -240,7 +240,7 @@ public final class ProcedureWALFormat {
builder.build().writeDelimitedTo(slot);
}
public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs)
public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs)
throws IOException {
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);

View File

@ -15,22 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
@ -38,7 +34,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Helper class that loads the procedures stored in a WAL
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureWALFormatReader {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
@ -98,8 +93,8 @@ public class ProcedureWALFormatReader {
// In the case above we need to read one more WAL to be able to consider
// the root procedure A and all children as ready.
// ==============================================================================================
private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024);
private final WALProcedureMap procedureMap = new WALProcedureMap(1024);
private final ProcedureWALFormat.Loader loader;
@ -111,33 +106,31 @@ public class ProcedureWALFormatReader {
* to rebuild the tracker.
*/
private final ProcedureStoreTracker tracker;
// TODO: private final boolean hasFastStartSupport;
/**
* If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
* re-build the list of procedures updated in that WAL because we need it for log cleaning
* purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
* (see {@link WALProcedureStore#removeInactiveLogs()}).
* However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
* re-building it.
* If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build
* the list of procedures modified in that WAL because we need it for log cleaning purposes. If
* all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see
* {@link WALProcedureStore#removeInactiveLogs()}).
* <p/>
* Notice that, the deleted part for this tracker will not be global valid as we can only count
* the deletes in the current file, but it is not big problem as finally, the above tracker will
* have the global state of deleted, and it will also be used to build the cleanup tracker.
*/
private ProcedureStoreTracker localTracker;
// private long compactionLogId;
private long maxProcId = 0;
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
ProcedureWALFormat.Loader loader) {
this.tracker = tracker;
this.loader = loader;
// we support fast-start only if we have a clean shutdown.
// this.hasFastStartSupport = !tracker.isEmpty();
}
public void read(final ProcedureWALFile log) throws IOException {
localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
if (localTracker != null) {
LOG.info("Rebuilding tracker for " + log);
public void read(ProcedureWALFile log) throws IOException {
localTracker = log.getTracker();
if (localTracker.isPartial()) {
LOG.info("Rebuilding tracker for {}", log);
}
long count = 0;
@ -147,7 +140,7 @@ public class ProcedureWALFormatReader {
while (hasMore) {
ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
if (entry == null) {
LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log);
LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log);
break;
}
count++;
@ -178,21 +171,17 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
if (localTracker != null) {
localTracker.setPartialFlag(false);
}
if (!localProcedureMap.isEmpty()) {
log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
log.setProcIds(localProcedureMap.getMinModifiedProcId(),
localProcedureMap.getMaxModifiedProcId());
if (localTracker.isPartial()) {
localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
localProcedureMap.getMaxModifiedProcId());
}
procedureMap.mergeTail(localProcedureMap);
//if (hasFastStartSupport) {
// TODO: Some procedure may be already runnables (see readInitEntry())
// (we can also check the "update map" in the log trackers)
// --------------------------------------------------
//EntryIterator iter = procedureMap.fetchReady();
//if (iter != null) loader.load(iter);
// --------------------------------------------------
//}
}
if (localTracker.isPartial()) {
localTracker.setPartialFlag(false);
}
}
@ -202,37 +191,46 @@ public class ProcedureWALFormatReader {
// fetch the procedure ready to run.
ProcedureIterator procIter = procedureMap.fetchReady();
if (procIter != null) loader.load(procIter);
if (procIter != null) {
loader.load(procIter);
}
// remaining procedures have missing link or dependencies
// consider them as corrupted, manual fix is probably required.
procIter = procedureMap.fetchAll();
if (procIter != null) loader.handleCorrupted(procIter);
if (procIter != null) {
loader.handleCorrupted(procIter);
}
}
private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) {
maxProcId = Math.max(maxProcId, proc.getProcId());
if (isRequired(proc.getProcId())) {
if (LOG.isTraceEnabled()) {
LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId());
private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
if (tracker.isPartial()) {
tracker.setDeleted(procId, true);
}
localProcedureMap.add(proc);
}
private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) {
if (tracker.isPartial()) {
tracker.insert(proc.getProcId());
}
}
if (localTracker != null) {
localTracker.insert(proc.getProcId());
private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) {
maxProcId = Math.max(maxProcId, proc.getProcId());
if (isRequired(proc.getProcId())) {
LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId());
localProcedureMap.add(proc);
insertIfPartial(tracker, proc);
}
insertIfPartial(localTracker, proc);
}
private void readInitEntry(final ProcedureWALEntry entry)
throws IOException {
private void readInitEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
loadProcedure(entry, entry.getProcedure(0));
}
private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
private void readInsertEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
loadProcedure(entry, entry.getProcedure(0));
for (int i = 1; i < entry.getProcedureCount(); ++i) {
@ -240,12 +238,12 @@ public class ProcedureWALFormatReader {
}
}
private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
private void readUpdateEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
loadProcedure(entry, entry.getProcedure(0));
}
private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
private void readDeleteEntry(ProcedureWALEntry entry) {
assert entry.hasProcId() : "expected ProcID";
if (entry.getChildIdCount() > 0) {
@ -267,598 +265,19 @@ public class ProcedureWALFormatReader {
}
private void deleteEntry(final long procId) {
if (LOG.isTraceEnabled()) {
LOG.trace("delete entry " + procId);
}
LOG.trace("delete entry {}", procId);
maxProcId = Math.max(maxProcId, procId);
localProcedureMap.remove(procId);
assert !procedureMap.contains(procId);
if (tracker.isPartial()) {
tracker.setDeleted(procId, true);
}
if (localTracker != null) {
// In case there is only delete entry for this procedure in current log.
localTracker.setDeleted(procId, true);
}
setDeletedIfPartial(tracker, procId);
setDeletedIfPartial(localTracker, procId);
}
private boolean isDeleted(final long procId) {
private boolean isDeleted(long procId) {
return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
}
private boolean isRequired(final long procId) {
private boolean isRequired(long procId) {
return !isDeleted(procId) && !procedureMap.contains(procId);
}
// ==========================================================================
// We keep an in-memory map of the procedures sorted by replay order.
// (see the details in the beginning of the file)
// _______________________________________________
// procedureMap = | A | | E | | C | | | | | G | | |
// D B
// replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
//
// We also have a lazy grouping by "root procedure", and a list of
// unlinked procedures. If after reading all the WALs we have unlinked
// procedures it means that we had a missing WAL or a corruption.
// rootHead = A <-> D <-> G
// B E
// C
// unlinkFromLinkList = None
// ==========================================================================
private static class Entry {
// For bucketed linked lists in hash-table.
protected Entry hashNext;
// child head
protected Entry childHead;
// double-link for rootHead or childHead
protected Entry linkNext;
protected Entry linkPrev;
// replay double-linked-list
protected Entry replayNext;
protected Entry replayPrev;
// procedure-infos
protected Procedure procedure;
protected ProcedureProtos.Procedure proto;
protected boolean ready = false;
public Entry(Entry hashNext) {
this.hashNext = hashNext;
}
public long getProcId() {
return proto.getProcId();
}
public long getParentId() {
return proto.getParentId();
}
public boolean hasParent() {
return proto.hasParentId();
}
public boolean isReady() {
return ready;
}
public boolean isFinished() {
if (!hasParent()) {
// we only consider 'root' procedures. because for the user 'finished'
// means when everything up to the 'root' is finished.
switch (proto.getState()) {
case ROLLEDBACK:
case SUCCESS:
return true;
default:
break;
}
}
return false;
}
public Procedure convert() throws IOException {
if (procedure == null) {
procedure = ProcedureUtil.convertToProcedure(proto);
}
return procedure;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Entry(");
sb.append(getProcId());
sb.append(", parentId=");
sb.append(getParentId());
sb.append(", class=");
sb.append(proto.getClassName());
sb.append(")");
return sb.toString();
}
}
private static class EntryIterator implements ProcedureIterator {
private final Entry replayHead;
private Entry current;
public EntryIterator(Entry replayHead) {
this.replayHead = replayHead;
this.current = replayHead;
}
@Override
public void reset() {
this.current = replayHead;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public boolean isNextFinished() {
return current != null && current.isFinished();
}
@Override
public void skipNext() {
current = current.replayNext;
}
@Override
public Procedure next() throws IOException {
try {
return current.convert();
} finally {
current = current.replayNext;
}
}
}
private static class WalProcedureMap {
// procedure hash table
private Entry[] procedureMap;
// replay-order double-linked-list
private Entry replayOrderHead;
private Entry replayOrderTail;
// root linked-list
private Entry rootHead;
// pending unlinked children (root not present yet)
private Entry childUnlinkedHead;
// Track ProcId range
private long minProcId = Long.MAX_VALUE;
private long maxProcId = Long.MIN_VALUE;
public WalProcedureMap(int size) {
procedureMap = new Entry[size];
replayOrderHead = null;
replayOrderTail = null;
rootHead = null;
childUnlinkedHead = null;
}
public void add(ProcedureProtos.Procedure procProto) {
trackProcIds(procProto.getProcId());
Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
boolean newEntry = entry.proto == null;
// We have seen procedure WALs where the entries are out of order; see HBASE-18152.
// To compensate, only replace the Entry procedure if for sure this new procedure
// is indeed an entry that came later. TODO: Fix the writing of procedure info so
// it does not violate basic expectation, that WALs contain procedure changes going
// from start to finish in sequence.
if (newEntry || isIncreasing(entry.proto, procProto)) {
entry.proto = procProto;
}
addToReplayList(entry);
if(newEntry) {
if (procProto.hasParentId()) {
childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
} else {
rootHead = addToLinkList(entry, rootHead);
}
}
}
/**
* @return True if this new procedure is 'richer' than the current one else
* false and we log this incidence where it appears that the WAL has older entries
* appended after newer ones. See HBASE-18152.
*/
private static boolean isIncreasing(ProcedureProtos.Procedure current,
ProcedureProtos.Procedure candidate) {
// Check that the procedures we see are 'increasing'. We used to compare
// procedure id first and then update time but it can legitimately go backwards if the
// procedure is failed or rolled back so that was unreliable. Was going to compare
// state but lets see if comparing update time enough (unfortunately this issue only
// seen under load...)
boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate();
if (!increasing) {
LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate);
}
return increasing;
}
public boolean remove(long procId) {
trackProcIds(procId);
Entry entry = removeFromMap(procId);
if (entry != null) {
unlinkFromReplayList(entry);
unlinkFromLinkList(entry);
return true;
}
return false;
}
private void trackProcIds(long procId) {
minProcId = Math.min(minProcId, procId);
maxProcId = Math.max(maxProcId, procId);
}
public long getMinProcId() {
return minProcId;
}
public long getMaxProcId() {
return maxProcId;
}
public boolean contains(long procId) {
return getProcedure(procId) != null;
}
public boolean isEmpty() {
return replayOrderHead == null;
}
public void clear() {
for (int i = 0; i < procedureMap.length; ++i) {
procedureMap[i] = null;
}
replayOrderHead = null;
replayOrderTail = null;
rootHead = null;
childUnlinkedHead = null;
minProcId = Long.MAX_VALUE;
maxProcId = Long.MIN_VALUE;
}
/*
* Merges two WalProcedureMap,
* the target is the "global" map, the source is the "local" map.
* - The entries in the hashtables are guaranteed to be unique.
* On replay we don't load procedures that already exist in the "global"
* map (the one we are merging the "local" in to).
* - The replayOrderList of the "local" nao will be appended to the "global"
* map replay list.
* - The "local" map will be cleared at the end of the operation.
*/
public void mergeTail(WalProcedureMap other) {
for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
int slotIndex = getMapSlot(p.getProcId());
p.hashNext = procedureMap[slotIndex];
procedureMap[slotIndex] = p;
}
if (replayOrderHead == null) {
replayOrderHead = other.replayOrderHead;
replayOrderTail = other.replayOrderTail;
rootHead = other.rootHead;
childUnlinkedHead = other.childUnlinkedHead;
} else {
// append replay list
assert replayOrderTail.replayNext == null;
assert other.replayOrderHead.replayPrev == null;
replayOrderTail.replayNext = other.replayOrderHead;
other.replayOrderHead.replayPrev = replayOrderTail;
replayOrderTail = other.replayOrderTail;
// merge rootHead
if (rootHead == null) {
rootHead = other.rootHead;
} else if (other.rootHead != null) {
Entry otherTail = findLinkListTail(other.rootHead);
otherTail.linkNext = rootHead;
rootHead.linkPrev = otherTail;
rootHead = other.rootHead;
}
// merge childUnlinkedHead
if (childUnlinkedHead == null) {
childUnlinkedHead = other.childUnlinkedHead;
} else if (other.childUnlinkedHead != null) {
Entry otherTail = findLinkListTail(other.childUnlinkedHead);
otherTail.linkNext = childUnlinkedHead;
childUnlinkedHead.linkPrev = otherTail;
childUnlinkedHead = other.childUnlinkedHead;
}
}
maxProcId = Math.max(maxProcId, other.maxProcId);
minProcId = Math.max(minProcId, other.minProcId);
other.clear();
}
/*
* Returns an EntryIterator with the list of procedures ready
* to be added to the executor.
* A Procedure is ready if its children and parent are ready.
*/
public EntryIterator fetchReady() {
buildGraph();
Entry readyHead = null;
Entry readyTail = null;
Entry p = replayOrderHead;
while (p != null) {
Entry next = p.replayNext;
if (p.isReady()) {
unlinkFromReplayList(p);
if (readyTail != null) {
readyTail.replayNext = p;
p.replayPrev = readyTail;
} else {
p.replayPrev = null;
readyHead = p;
}
readyTail = p;
p.replayNext = null;
}
p = next;
}
// we need the hash-table lookups for parents, so this must be done
// out of the loop where we check isReadyToRun()
for (p = readyHead; p != null; p = p.replayNext) {
removeFromMap(p.getProcId());
unlinkFromLinkList(p);
}
return readyHead != null ? new EntryIterator(readyHead) : null;
}
/*
* Drain this map and return all procedures in it.
*/
public EntryIterator fetchAll() {
Entry head = replayOrderHead;
for (Entry p = head; p != null; p = p.replayNext) {
removeFromMap(p.getProcId());
}
for (int i = 0; i < procedureMap.length; ++i) {
assert procedureMap[i] == null : "map not empty i=" + i;
}
replayOrderHead = null;
replayOrderTail = null;
childUnlinkedHead = null;
rootHead = null;
return head != null ? new EntryIterator(head) : null;
}
private void buildGraph() {
Entry p = childUnlinkedHead;
while (p != null) {
Entry next = p.linkNext;
Entry rootProc = getRootProcedure(p);
if (rootProc != null) {
rootProc.childHead = addToLinkList(p, rootProc.childHead);
}
p = next;
}
for (p = rootHead; p != null; p = p.linkNext) {
checkReadyToRun(p);
}
}
private Entry getRootProcedure(Entry entry) {
while (entry != null && entry.hasParent()) {
entry = getProcedure(entry.getParentId());
}
return entry;
}
/*
* (see the comprehensive explanation in the beginning of the file)
* A Procedure is ready when parent and children are ready.
* "ready" means that we all the information that we need in-memory.
*
* Example-1:
* We have two WALs, we start reading from the newest (wal-2)
* wal-2 | C B |
* wal-1 | A B C |
*
* If C and B don't depend on A (A is not the parent), we can start them
* before reading wal-1. If B is the only one with parent A we can start C.
* We have to read one more WAL before being able to start B.
*
* How do we know with the only information in B that we are not ready.
* - easy case, the parent is missing from the global map
* - more complex case we look at the Stack IDs.
*
* The Stack-IDs are added to the procedure order as an incremental index
* tracking how many times that procedure was executed, which is equivalent
* to the number of times we wrote the procedure to the WAL.
* In the example above:
* wal-2: B has stackId = [1, 2]
* wal-1: B has stackId = [1]
* wal-1: A has stackId = [0]
*
* Since we know that the Stack-IDs are incremental for a Procedure,
* we notice that there is a gap in the stackIds of B, so something was
* executed before.
* To identify when a Procedure is ready we do the sum of the stackIds of
* the procedure and the parent. if the stackIdSum is equal to the
* sum of {1..maxStackId} then everything we need is available.
*
* Example-2
* wal-2 | A | A stackIds = [0, 2]
* wal-1 | A B | B stackIds = [1]
*
* There is a gap between A stackIds so something was executed in between.
*/
private boolean checkReadyToRun(Entry rootEntry) {
assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
if (rootEntry.isFinished()) {
// If the root procedure is finished, sub-procedures should be gone
if (rootEntry.childHead != null) {
LOG.error("unexpected active children for root-procedure: " + rootEntry);
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
LOG.error("unexpected active children: " + p);
}
}
assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
rootEntry.ready = true;
return true;
}
int stackIdSum = 0;
int maxStackId = 0;
for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
int stackId = 1 + rootEntry.proto.getStackId(i);
maxStackId = Math.max(maxStackId, stackId);
stackIdSum += stackId;
if (LOG.isTraceEnabled()) {
LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum +
" maxStackid=" + maxStackId + " " + rootEntry);
}
}
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
int stackId = 1 + p.proto.getStackId(i);
maxStackId = Math.max(maxStackId, stackId);
stackIdSum += stackId;
if (LOG.isTraceEnabled()) {
LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum +
" maxStackid=" + maxStackId + " " + p);
}
}
}
// The cmpStackIdSum is this formula for finding the sum of a series of numbers:
// http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
if (cmpStackIdSum == stackIdSum) {
rootEntry.ready = true;
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
p.ready = true;
}
return true;
}
return false;
}
private void unlinkFromReplayList(Entry entry) {
if (replayOrderHead == entry) {
replayOrderHead = entry.replayNext;
}
if (replayOrderTail == entry) {
replayOrderTail = entry.replayPrev;
}
if (entry.replayPrev != null) {
entry.replayPrev.replayNext = entry.replayNext;
}
if (entry.replayNext != null) {
entry.replayNext.replayPrev = entry.replayPrev;
}
}
private void addToReplayList(final Entry entry) {
unlinkFromReplayList(entry);
entry.replayNext = replayOrderHead;
entry.replayPrev = null;
if (replayOrderHead != null) {
replayOrderHead.replayPrev = entry;
} else {
replayOrderTail = entry;
}
replayOrderHead = entry;
}
private void unlinkFromLinkList(Entry entry) {
if (entry == rootHead) {
rootHead = entry.linkNext;
} else if (entry == childUnlinkedHead) {
childUnlinkedHead = entry.linkNext;
}
if (entry.linkPrev != null) {
entry.linkPrev.linkNext = entry.linkNext;
}
if (entry.linkNext != null) {
entry.linkNext.linkPrev = entry.linkPrev;
}
}
private Entry addToLinkList(Entry entry, Entry linkHead) {
unlinkFromLinkList(entry);
entry.linkNext = linkHead;
entry.linkPrev = null;
if (linkHead != null) {
linkHead.linkPrev = entry;
}
return entry;
}
private Entry findLinkListTail(Entry linkHead) {
Entry tail = linkHead;
while (tail.linkNext != null) {
tail = tail.linkNext;
}
return tail;
}
private Entry addToMap(final long procId, final boolean hasParent) {
int slotIndex = getMapSlot(procId);
Entry entry = getProcedure(slotIndex, procId);
if (entry != null) return entry;
entry = new Entry(procedureMap[slotIndex]);
procedureMap[slotIndex] = entry;
return entry;
}
private Entry removeFromMap(final long procId) {
int slotIndex = getMapSlot(procId);
Entry prev = null;
Entry entry = procedureMap[slotIndex];
while (entry != null) {
if (procId == entry.getProcId()) {
if (prev != null) {
prev.hashNext = entry.hashNext;
} else {
procedureMap[slotIndex] = entry.hashNext;
}
entry.hashNext = null;
return entry;
}
prev = entry;
entry = entry.hashNext;
}
return null;
}
private Entry getProcedure(final long procId) {
return getProcedure(getMapSlot(procId), procId);
}
private Entry getProcedure(final int slotIndex, final long procId) {
Entry entry = procedureMap[slotIndex];
while (entry != null) {
if (procId == entry.getProcId()) {
return entry;
}
entry = entry.hashNext;
}
return null;
}
private int getMapSlot(final long procId) {
return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length);
}
}
}

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
@ -30,20 +29,21 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
/**
* ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file
@ -160,7 +160,7 @@ public class ProcedureWALPrettyPrinter extends Configured implements Tool {
final List<Path> files = new ArrayList<>();
try {
CommandLine cmd = new PosixParser().parse(options, args);
CommandLine cmd = new DefaultParser().parse(options, args);
if (cmd.hasOption("f")) {
files.add(new Path(cmd.getOptionValue("f")));

View File

@ -0,0 +1,607 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* We keep an in-memory map of the procedures sorted by replay order. (see the details in the
* beginning of {@link ProcedureWALFormatReader}).
*
* <pre>
* procedureMap = | A | | E | | C | | | | | G | | |
* D B
* replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
*
* We also have a lazy grouping by "root procedure", and a list of
* unlinked procedures. If after reading all the WALs we have unlinked
* procedures it means that we had a missing WAL or a corruption.
* rootHead = A <-> D <-> G
* B E
* C
* unlinkFromLinkList = None
* </pre>
*/
class WALProcedureMap {
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
private static class Entry {
// For bucketed linked lists in hash-table.
private Entry hashNext;
// child head
private Entry childHead;
// double-link for rootHead or childHead
private Entry linkNext;
private Entry linkPrev;
// replay double-linked-list
private Entry replayNext;
private Entry replayPrev;
// procedure-infos
private Procedure<?> procedure;
private ProcedureProtos.Procedure proto;
private boolean ready = false;
public Entry(Entry hashNext) {
this.hashNext = hashNext;
}
public long getProcId() {
return proto.getProcId();
}
public long getParentId() {
return proto.getParentId();
}
public boolean hasParent() {
return proto.hasParentId();
}
public boolean isReady() {
return ready;
}
public boolean isFinished() {
if (!hasParent()) {
// we only consider 'root' procedures. because for the user 'finished'
// means when everything up to the 'root' is finished.
switch (proto.getState()) {
case ROLLEDBACK:
case SUCCESS:
return true;
default:
break;
}
}
return false;
}
public Procedure<?> convert() throws IOException {
if (procedure == null) {
procedure = ProcedureUtil.convertToProcedure(proto);
}
return procedure;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Entry(");
sb.append(getProcId());
sb.append(", parentId=");
sb.append(getParentId());
sb.append(", class=");
sb.append(proto.getClassName());
sb.append(")");
return sb.toString();
}
}
private static class EntryIterator implements ProcedureIterator {
private final Entry replayHead;
private Entry current;
public EntryIterator(Entry replayHead) {
this.replayHead = replayHead;
this.current = replayHead;
}
@Override
public void reset() {
this.current = replayHead;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public boolean isNextFinished() {
return current != null && current.isFinished();
}
@Override
public void skipNext() {
current = current.replayNext;
}
@Override
public Procedure<?> next() throws IOException {
try {
return current.convert();
} finally {
current = current.replayNext;
}
}
}
// procedure hash table
private Entry[] procedureMap;
// replay-order double-linked-list
private Entry replayOrderHead;
private Entry replayOrderTail;
// root linked-list
private Entry rootHead;
// pending unlinked children (root not present yet)
private Entry childUnlinkedHead;
// Track ProcId range
private long minModifiedProcId = Long.MAX_VALUE;
private long maxModifiedProcId = Long.MIN_VALUE;
public WALProcedureMap(int size) {
procedureMap = new Entry[size];
replayOrderHead = null;
replayOrderTail = null;
rootHead = null;
childUnlinkedHead = null;
}
public void add(ProcedureProtos.Procedure procProto) {
trackProcIds(procProto.getProcId());
Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
boolean newEntry = entry.proto == null;
// We have seen procedure WALs where the entries are out of order; see HBASE-18152.
// To compensate, only replace the Entry procedure if for sure this new procedure
// is indeed an entry that came later.
// TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs
// contain procedure changes goingfrom start to finish in sequence.
if (newEntry || isIncreasing(entry.proto, procProto)) {
entry.proto = procProto;
}
addToReplayList(entry);
if (newEntry) {
if (procProto.hasParentId()) {
childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
} else {
rootHead = addToLinkList(entry, rootHead);
}
}
}
/**
* @return True if this new procedure is 'richer' than the current one else false and we log this
* incidence where it appears that the WAL has older entries appended after newer ones.
* See HBASE-18152.
*/
private static boolean isIncreasing(ProcedureProtos.Procedure current,
ProcedureProtos.Procedure candidate) {
// Check that the procedures we see are 'increasing'. We used to compare
// procedure id first and then update time but it can legitimately go backwards if the
// procedure is failed or rolled back so that was unreliable. Was going to compare
// state but lets see if comparing update time enough (unfortunately this issue only
// seen under load...)
boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate();
if (!increasing) {
LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate);
}
return increasing;
}
public boolean remove(long procId) {
trackProcIds(procId);
Entry entry = removeFromMap(procId);
if (entry != null) {
unlinkFromReplayList(entry);
unlinkFromLinkList(entry);
return true;
}
return false;
}
private void trackProcIds(long procId) {
minModifiedProcId = Math.min(minModifiedProcId, procId);
maxModifiedProcId = Math.max(maxModifiedProcId, procId);
}
public long getMinModifiedProcId() {
return minModifiedProcId;
}
public long getMaxModifiedProcId() {
return maxModifiedProcId;
}
public boolean contains(long procId) {
return getProcedure(procId) != null;
}
public boolean isEmpty() {
return replayOrderHead == null;
}
public void clear() {
for (int i = 0; i < procedureMap.length; ++i) {
procedureMap[i] = null;
}
replayOrderHead = null;
replayOrderTail = null;
rootHead = null;
childUnlinkedHead = null;
minModifiedProcId = Long.MAX_VALUE;
maxModifiedProcId = Long.MIN_VALUE;
}
/*
* Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. -
* The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures
* that already exist in the "global" map (the one we are merging the "local" in to). - The
* replayOrderList of the "local" nao will be appended to the "global" map replay list. - The
* "local" map will be cleared at the end of the operation.
*/
public void mergeTail(WALProcedureMap other) {
for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
int slotIndex = getMapSlot(p.getProcId());
p.hashNext = procedureMap[slotIndex];
procedureMap[slotIndex] = p;
}
if (replayOrderHead == null) {
replayOrderHead = other.replayOrderHead;
replayOrderTail = other.replayOrderTail;
rootHead = other.rootHead;
childUnlinkedHead = other.childUnlinkedHead;
} else {
// append replay list
assert replayOrderTail.replayNext == null;
assert other.replayOrderHead.replayPrev == null;
replayOrderTail.replayNext = other.replayOrderHead;
other.replayOrderHead.replayPrev = replayOrderTail;
replayOrderTail = other.replayOrderTail;
// merge rootHead
if (rootHead == null) {
rootHead = other.rootHead;
} else if (other.rootHead != null) {
Entry otherTail = findLinkListTail(other.rootHead);
otherTail.linkNext = rootHead;
rootHead.linkPrev = otherTail;
rootHead = other.rootHead;
}
// merge childUnlinkedHead
if (childUnlinkedHead == null) {
childUnlinkedHead = other.childUnlinkedHead;
} else if (other.childUnlinkedHead != null) {
Entry otherTail = findLinkListTail(other.childUnlinkedHead);
otherTail.linkNext = childUnlinkedHead;
childUnlinkedHead.linkPrev = otherTail;
childUnlinkedHead = other.childUnlinkedHead;
}
}
maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
other.clear();
}
/**
* Returns an EntryIterator with the list of procedures ready to be added to the executor. A
* Procedure is ready if its children and parent are ready.
*/
public ProcedureIterator fetchReady() {
buildGraph();
Entry readyHead = null;
Entry readyTail = null;
Entry p = replayOrderHead;
while (p != null) {
Entry next = p.replayNext;
if (p.isReady()) {
unlinkFromReplayList(p);
if (readyTail != null) {
readyTail.replayNext = p;
p.replayPrev = readyTail;
} else {
p.replayPrev = null;
readyHead = p;
}
readyTail = p;
p.replayNext = null;
}
p = next;
}
// we need the hash-table lookups for parents, so this must be done
// out of the loop where we check isReadyToRun()
for (p = readyHead; p != null; p = p.replayNext) {
removeFromMap(p.getProcId());
unlinkFromLinkList(p);
}
return readyHead != null ? new EntryIterator(readyHead) : null;
}
/**
* Drain this map and return all procedures in it.
*/
public ProcedureIterator fetchAll() {
Entry head = replayOrderHead;
for (Entry p = head; p != null; p = p.replayNext) {
removeFromMap(p.getProcId());
}
for (int i = 0; i < procedureMap.length; ++i) {
assert procedureMap[i] == null : "map not empty i=" + i;
}
replayOrderHead = null;
replayOrderTail = null;
childUnlinkedHead = null;
rootHead = null;
return head != null ? new EntryIterator(head) : null;
}
private void buildGraph() {
Entry p = childUnlinkedHead;
while (p != null) {
Entry next = p.linkNext;
Entry rootProc = getRootProcedure(p);
if (rootProc != null) {
rootProc.childHead = addToLinkList(p, rootProc.childHead);
}
p = next;
}
for (p = rootHead; p != null; p = p.linkNext) {
checkReadyToRun(p);
}
}
private Entry getRootProcedure(Entry entry) {
while (entry != null && entry.hasParent()) {
entry = getProcedure(entry.getParentId());
}
return entry;
}
/**
* (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A
* Procedure is ready when parent and children are ready. "ready" means that we all the
* information that we need in-memory.
* <p/>
* Example-1:<br/>
* We have two WALs, we start reading from the newest (wal-2)
*
* <pre>
* wal-2 | C B |
* wal-1 | A B C |
* </pre>
*
* If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If
* B is the only one with parent A we can start C. We have to read one more WAL before being able
* to start B.
* <p/>
* How do we know with the only information in B that we are not ready.
* <ul>
* <li>easy case, the parent is missing from the global map</li>
* <li>more complex case we look at the Stack IDs.</li>
* </ul>
* The Stack-IDs are added to the procedure order as an incremental index tracking how many times
* that procedure was executed, which is equivalent to the number of times we wrote the procedure
* to the WAL. <br/>
* In the example above:
*
* <pre>
* wal-2: B has stackId = [1, 2]
* wal-1: B has stackId = [1]
* wal-1: A has stackId = [0]
* </pre>
*
* Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap
* in the stackIds of B, so something was executed before.
* <p/>
* To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the
* parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is
* available.
* <p/>
* Example-2
*
* <pre>
* wal-2 | A | A stackIds = [0, 2]
* wal-1 | A B | B stackIds = [1]
* </pre>
*
* There is a gap between A stackIds so something was executed in between.
*/
private boolean checkReadyToRun(Entry rootEntry) {
assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
if (rootEntry.isFinished()) {
// If the root procedure is finished, sub-procedures should be gone
if (rootEntry.childHead != null) {
LOG.error("unexpected active children for root-procedure: {}", rootEntry);
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
LOG.error("unexpected active children: {}", p);
}
}
assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
rootEntry.ready = true;
return true;
}
int stackIdSum = 0;
int maxStackId = 0;
for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
int stackId = 1 + rootEntry.proto.getStackId(i);
maxStackId = Math.max(maxStackId, stackId);
stackIdSum += stackId;
LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId,
rootEntry);
}
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
int stackId = 1 + p.proto.getStackId(i);
maxStackId = Math.max(maxStackId, stackId);
stackIdSum += stackId;
LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p);
}
}
// The cmpStackIdSum is this formula for finding the sum of a series of numbers:
// http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
if (cmpStackIdSum == stackIdSum) {
rootEntry.ready = true;
for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
p.ready = true;
}
return true;
}
return false;
}
private void unlinkFromReplayList(Entry entry) {
if (replayOrderHead == entry) {
replayOrderHead = entry.replayNext;
}
if (replayOrderTail == entry) {
replayOrderTail = entry.replayPrev;
}
if (entry.replayPrev != null) {
entry.replayPrev.replayNext = entry.replayNext;
}
if (entry.replayNext != null) {
entry.replayNext.replayPrev = entry.replayPrev;
}
}
private void addToReplayList(final Entry entry) {
unlinkFromReplayList(entry);
entry.replayNext = replayOrderHead;
entry.replayPrev = null;
if (replayOrderHead != null) {
replayOrderHead.replayPrev = entry;
} else {
replayOrderTail = entry;
}
replayOrderHead = entry;
}
private void unlinkFromLinkList(Entry entry) {
if (entry == rootHead) {
rootHead = entry.linkNext;
} else if (entry == childUnlinkedHead) {
childUnlinkedHead = entry.linkNext;
}
if (entry.linkPrev != null) {
entry.linkPrev.linkNext = entry.linkNext;
}
if (entry.linkNext != null) {
entry.linkNext.linkPrev = entry.linkPrev;
}
}
private Entry addToLinkList(Entry entry, Entry linkHead) {
unlinkFromLinkList(entry);
entry.linkNext = linkHead;
entry.linkPrev = null;
if (linkHead != null) {
linkHead.linkPrev = entry;
}
return entry;
}
private Entry findLinkListTail(Entry linkHead) {
Entry tail = linkHead;
while (tail.linkNext != null) {
tail = tail.linkNext;
}
return tail;
}
private Entry addToMap(long procId, boolean hasParent) {
int slotIndex = getMapSlot(procId);
Entry entry = getProcedure(slotIndex, procId);
if (entry != null) {
return entry;
}
entry = new Entry(procedureMap[slotIndex]);
procedureMap[slotIndex] = entry;
return entry;
}
private Entry removeFromMap(final long procId) {
int slotIndex = getMapSlot(procId);
Entry prev = null;
Entry entry = procedureMap[slotIndex];
while (entry != null) {
if (procId == entry.getProcId()) {
if (prev != null) {
prev.hashNext = entry.hashNext;
} else {
procedureMap[slotIndex] = entry.hashNext;
}
entry.hashNext = null;
return entry;
}
prev = entry;
entry = entry.hashNext;
}
return null;
}
private Entry getProcedure(long procId) {
return getProcedure(getMapSlot(procId), procId);
}
private Entry getProcedure(int slotIndex, long procId) {
Entry entry = procedureMap[slotIndex];
while (entry != null) {
if (procId == entry.getProcId()) {
return entry;
}
entry = entry.hashNext;
}
return null;
}
private int getMapSlot(long procId) {
return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length);
}
}

View File

@ -15,14 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
@ -35,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -52,25 +49,60 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
/**
* WAL implementation of the ProcedureStore.
* <p/>
* When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()},
* then {@link #load(ProcedureLoader)}.
* <p/>
* In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by
* calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
* the old wal files.
* <p/>
* FIXME: notice that the current recover lease implementation is problematic, it can not deal with
* the races if there are two master both wants to acquire the lease...
* <p/>
* In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the
* comments of this method for more details.
* <p/>
* The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
* a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete
* methods we will put thing into the {@link #slots} and wait. And there is a background sync
* thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them
* to the FileSystem, and notify the caller that we have finished.
* <p/>
* TODO: try using disruptor to increase performance and simplify the logic?
* <p/>
* The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is
* also the one being written currently. And the deleted bits in it are for all the procedures, not
* only the ones in the newest wal file. And when rolling a log, we will first store it in the
* trailer of the current wal file, and then reset its modified bits, so that it can start to track
* the modified procedures for the new wal file.
* <p/>
* The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal
* file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
* will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
* method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
* with the tracker of every newer wal files, using the
* {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out
* that all the modified procedures for the oldest wal file are modified or deleted in newer wal
* files, then we can delete it.
* @see ProcedureWALPrettyPrinter for printing content of a single WAL.
* @see #main(String[]) to parse a directory of MasterWALProcs.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALProcedureStore extends ProcedureStoreBase {
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
public static final String LOG_PREFIX = "pv2-";
@ -166,7 +198,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private int syncWaitMsec;
// Variables used for UI display
private CircularFifoQueue syncMetricsQueue;
private CircularFifoQueue<SyncMetrics> syncMetricsQueue;
public static class SyncMetrics {
private long timestamp;
@ -228,11 +260,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Create archive dir up front. Rename won't work w/o it up on HDFS.
if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
if (this.fs.mkdirs(this.walArchiveDir)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir);
}
LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir);
} else {
LOG.warn("Failed create of " + this.walArchiveDir);
LOG.warn("Failed create of {}", this.walArchiveDir);
}
}
}
@ -248,7 +278,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
runningProcCount = numSlots;
syncMaxSlot = numSlots;
slots = new ByteSlot[numSlots];
slotsCache = new LinkedTransferQueue();
slotsCache = new LinkedTransferQueue<>();
while (slotsCache.size() < numSlots) {
slotsCache.offer(new ByteSlot());
}
@ -267,7 +297,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
// WebUI
syncMetricsQueue = new CircularFifoQueue(
syncMetricsQueue = new CircularFifoQueue<>(
conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
// Init sync thread
@ -394,9 +424,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// We have the lease on the log
oldLogs = getLogFiles();
if (getMaxLogId(oldLogs) > flushLogId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
}
LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId);
logs.getLast().removeFile(this.walArchiveDir);
continue;
}
@ -410,7 +438,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
public void load(final ProcedureLoader loader) throws IOException {
public void load(ProcedureLoader loader) throws IOException {
lock.lock();
try {
if (logs.isEmpty()) {
@ -425,7 +453,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
// Load the old logs
final Iterator<ProcedureWALFile> it = logs.descendingIterator();
Iterator<ProcedureWALFile> it = logs.descendingIterator();
it.next(); // Skip the current log
ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
@ -485,7 +513,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
public void insert(final Procedure proc, final Procedure[] subprocs) {
public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
}
@ -519,7 +547,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
public void insert(final Procedure[] procs) {
public void insert(Procedure<?>[] procs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Insert " + Arrays.toString(procs));
}
@ -548,7 +576,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
public void update(final Procedure proc) {
public void update(Procedure<?> proc) {
if (LOG.isTraceEnabled()) {
LOG.trace("Update " + proc);
}
@ -571,11 +599,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
public void delete(final long procId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Delete " + procId);
}
public void delete(long procId) {
LOG.trace("Delete {}", procId);
ByteSlot slot = acquireSlot();
try {
// Serialize the delete
@ -594,7 +619,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
public void delete(final Procedure proc, final long[] subProcIds) {
public void delete(Procedure<?> proc, long[] subProcIds) {
assert proc != null : "expected a non-null procedure";
assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
if (LOG.isTraceEnabled()) {
@ -630,7 +655,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
private void delete(final long[] procIds) {
private void delete(long[] procIds) {
if (LOG.isTraceEnabled()) {
LOG.trace("Delete " + Arrays.toString(procIds));
}
@ -736,20 +761,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
storeTracker.insert(subProcIds);
} else {
storeTracker.insert(procId, subProcIds);
holdingCleanupTracker.setDeletedIfSet(procId);
holdingCleanupTracker.setDeletedIfModified(procId);
}
break;
case UPDATE:
storeTracker.update(procId);
holdingCleanupTracker.setDeletedIfSet(procId);
holdingCleanupTracker.setDeletedIfModified(procId);
break;
case DELETE:
if (subProcIds != null && subProcIds.length > 0) {
storeTracker.delete(subProcIds);
holdingCleanupTracker.setDeletedIfSet(subProcIds);
holdingCleanupTracker.setDeletedIfModified(subProcIds);
} else {
storeTracker.delete(procId);
holdingCleanupTracker.setDeletedIfSet(procId);
holdingCleanupTracker.setDeletedIfModified(procId);
}
break;
default:
@ -973,16 +998,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
private void periodicRoll() throws IOException {
if (storeTracker.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("no active procedures");
}
tryRollWriter();
removeAllLogs(flushLogId - 1);
} else {
if (storeTracker.isUpdated()) {
if (LOG.isTraceEnabled()) {
if (storeTracker.isAllModified()) {
LOG.trace("all the active procedures are in the latest log");
}
removeAllLogs(flushLogId - 1);
}
@ -997,18 +1018,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private boolean rollWriter() throws IOException {
if (!isRunning()) return false;
if (!isRunning()) {
return false;
}
// Create new state-log
if (!rollWriter(flushLogId + 1)) {
LOG.warn("someone else has already created log " + flushLogId);
LOG.warn("someone else has already created log {}", flushLogId);
return false;
}
// We have the lease on the log,
// but we should check if someone else has created new files
if (getMaxLogId(getLogFiles()) > flushLogId) {
LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
logs.getLast().removeFile(this.walArchiveDir);
return false;
}
@ -1064,7 +1087,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
closeCurrentLogStream();
storeTracker.resetUpdates();
storeTracker.resetModified();
stream = newStream;
flushLogId = logId;
totalSynced.set(0);
@ -1092,12 +1115,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
ProcedureWALFile log = logs.getLast();
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
log.updateLocalTracker(storeTracker);
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
log.addToSize(trailerSize);
} catch (IOException e) {
LOG.warn("Unable to write the trailer: " + e.getMessage());
LOG.warn("Unable to write the trailer", e);
}
try {
stream.close();
@ -1134,9 +1157,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
// - the other WALs are scanned to remove procs already in other wals.
// TODO: exit early if holdingCleanupTracker.isEmpty()
holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
holdingCleanupTracker.setDeletedIfSet(storeTracker);
holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker);
for (int i = 1, size = logs.size() - 1; i < size; ++i) {
holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker());
holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker());
}
}
@ -1144,12 +1167,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
* Remove all logs with logId <= {@code lastLogId}.
*/
private void removeAllLogs(long lastLogId) {
if (logs.size() <= 1) return;
if (LOG.isTraceEnabled()) {
LOG.trace("Remove all state logs with ID less than " + lastLogId);
if (logs.size() <= 1) {
return;
}
LOG.trace("Remove all state logs with ID less than {}", lastLogId);
boolean removed = false;
while (logs.size() > 1) {
ProcedureWALFile log = logs.getFirst();
@ -1167,14 +1190,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Removing log=" + log);
}
LOG.trace("Removing log={}", log);
log.removeFile(walArchiveDir);
logs.remove(log);
if (LOG.isDebugEnabled()) {
LOG.info("Removed log=" + log + ", activeLogs=" + logs);
}
LOG.debug("Removed log={}, activeLogs={}", log, logs);
assert logs.size() > 0 : "expected at least one log";
} catch (IOException e) {
LOG.error("Unable to remove log: " + log, e);
@ -1238,24 +1257,28 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
private static long getMaxLogId(final FileStatus[] logFiles) {
long maxLogId = 0;
if (logFiles != null && logFiles.length > 0) {
for (int i = 0; i < logFiles.length; ++i) {
maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
/**
* Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
* the file set by log id.
* @return Max-LogID of the specified log file set
*/
private static long getMaxLogId(FileStatus[] logFiles) {
if (logFiles == null || logFiles.length == 0) {
return 0L;
}
}
return maxLogId;
return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName());
}
/**
* Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort
* the file set by log id.
* @return Max-LogID of the specified log file set
*/
private long initOldLogs(final FileStatus[] logFiles) throws IOException {
this.logs.clear();
private long initOldLogs(FileStatus[] logFiles) throws IOException {
if (logFiles == null || logFiles.length == 0) {
return 0L;
}
long maxLogId = 0;
if (logFiles != null && logFiles.length > 0) {
for (int i = 0; i < logFiles.length; ++i) {
final Path logPath = logFiles[i].getPath();
leaseRecovery.recoverFileLease(fs, logPath);
@ -1269,19 +1292,18 @@ public class WALProcedureStore extends ProcedureStoreBase {
this.logs.add(log);
}
}
Collections.sort(this.logs);
initTrackerFromOldLogs();
}
return maxLogId;
}
/**
* If last log's tracker is not null, use it as {@link #storeTracker}.
* Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild
* it using entries in the log.
* If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker
* as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log.
*/
private void initTrackerFromOldLogs() {
if (logs.isEmpty() || !isRunning()) return;
if (logs.isEmpty() || !isRunning()) {
return;
}
ProcedureWALFile log = logs.getLast();
if (!log.getTracker().isPartial()) {
storeTracker.resetTo(log.getTracker());
@ -1298,17 +1320,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
throws IOException {
final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
if (logFile.getLen() == 0) {
LOG.warn("Remove uninitialized log: " + logFile);
LOG.warn("Remove uninitialized log: {}", logFile);
log.removeFile(walArchiveDir);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Opening Pv2 " + logFile);
}
LOG.debug("Opening Pv2 {}", logFile);
try {
log.open();
} catch (ProcedureWALFormat.InvalidWALDataException e) {
LOG.warn("Remove uninitialized log: " + logFile, e);
LOG.warn("Remove uninitialized log: {}", logFile, e);
log.removeFile(walArchiveDir);
return null;
} catch (IOException e) {
@ -1322,7 +1342,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (IOException e) {
log.getTracker().reset();
log.getTracker().setPartialFlag(true);
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
LOG.warn("Unable to read tracker for {}", log, e);
}
log.close();
@ -1350,7 +1370,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
});
try {
store.start(16);
ProcedureExecutor pe = new ProcedureExecutor(conf, new Object()/*Pass anything*/, store);
ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store);
pe.init(1, true);
} finally {
store.stop(true);

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import java.util.Random;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.BitSetNode;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
@ -119,29 +118,29 @@ public class TestProcedureStoreTracker {
tracker.insert(procs[0]);
tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] });
assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated());
assertTrue(tracker.isAllModified());
tracker.resetUpdates();
assertFalse(tracker.isUpdated());
tracker.resetModified();
assertFalse(tracker.isAllModified());
for (int i = 0; i < 4; ++i) {
tracker.update(procs[i]);
assertFalse(tracker.isEmpty());
assertFalse(tracker.isUpdated());
assertFalse(tracker.isAllModified());
}
tracker.update(procs[4]);
assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated());
assertTrue(tracker.isAllModified());
tracker.update(procs[5]);
assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated());
assertTrue(tracker.isAllModified());
for (int i = 0; i < 5; ++i) {
tracker.delete(procs[i]);
assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated());
assertTrue(tracker.isAllModified());
}
tracker.delete(procs[5]);
assertTrue(tracker.isEmpty());
@ -235,7 +234,7 @@ public class TestProcedureStoreTracker {
for (long i : active) {
tracker.insert(i);
}
tracker.resetUpdates();
tracker.resetModified();
for (long i : updated) {
tracker.update(i);
}
@ -252,11 +251,11 @@ public class TestProcedureStoreTracker {
BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) {
BitSetNode bitSetNode = new BitSetNode(0L, false);
for (long i : active) {
bitSetNode.update(i);
bitSetNode.insertOrUpdate(i);
}
bitSetNode.resetUpdates();
bitSetNode.resetModified();
for (long i : updated) {
bitSetNode.update(i);
bitSetNode.insertOrUpdate(i);
}
for (long i : deleted) {
bitSetNode.delete(i);
@ -276,9 +275,9 @@ public class TestProcedureStoreTracker {
assertEquals(false, tracker.isEmpty());
for (int i = 0; i < procIds.length; ++i) {
tracker.setDeletedIfSet(procIds[i] - 1);
tracker.setDeletedIfSet(procIds[i]);
tracker.setDeletedIfSet(procIds[i] + 1);
tracker.setDeletedIfModified(procIds[i] - 1);
tracker.setDeletedIfModified(procIds[i]);
tracker.setDeletedIfModified(procIds[i] + 1);
}
assertEquals(true, tracker.isEmpty());
@ -289,7 +288,7 @@ public class TestProcedureStoreTracker {
}
assertEquals(false, tracker.isEmpty());
tracker.setDeletedIfSet(procIds);
tracker.setDeletedIfModified(procIds);
assertEquals(true, tracker.isEmpty());
}
}

View File

@ -423,11 +423,11 @@ public class TestWALProcedureStore {
final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
for (int index : updatedProcs) {
long procId = procs[index].getProcId();
assertTrue("Procedure id : " + procId, tracker.isUpdated(procId));
assertTrue("Procedure id : " + procId, tracker.isModified(procId));
}
for (int index : nonUpdatedProcs) {
long procId = procs[index].getProcId();
assertFalse("Procedure id : " + procId, tracker.isUpdated(procId));
assertFalse("Procedure id : " + procId, tracker.isModified(procId));
}
}

View File

@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* avoiding port contention if another local HBase instance is already running).
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
* setting it to true.
* For triggering test.
*/
@InterfaceAudience.Public
@SuppressWarnings("deprecation")