HBASE-16524 Procedure v2 - Compute WALs cleanup on wal modification and not on every sync (Matteo Bertozzi)

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Michael Stack 2016-12-27 13:53:43 -08:00
parent ccb8d671d5
commit 319ecd867a
5 changed files with 321 additions and 268 deletions

View File

@ -156,11 +156,18 @@ public class ProcedureStoreTracker {
partial = false;
}
public BitSetNode(BitSetNode other) {
public BitSetNode(final BitSetNode other, final boolean resetDelete) {
this.start = other.start;
this.partial = other.partial;
this.updated = other.updated.clone();
this.deleted = other.deleted.clone();
if (resetDelete) {
this.deleted = new long[other.deleted.length];
for (int i = 0; i < this.deleted.length; ++i) {
this.deleted[i] = ~(other.updated[i]);
}
} else {
this.deleted = other.deleted.clone();
}
}
public void update(final long procId) {
@ -171,11 +178,11 @@ public class ProcedureStoreTracker {
updateState(procId, true);
}
public Long getStart() {
public long getStart() {
return start;
}
public Long getEnd() {
public long getEnd() {
return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
}
@ -249,33 +256,6 @@ public class ProcedureStoreTracker {
}
}
/**
* If an active (non-deleted) procedure in current BitSetNode has been updated in {@code other}
* BitSetNode, then delete it from current node.
* @return true if node changed, i.e. some procedure(s) from {@code other} was subtracted from
* current node.
*/
public boolean subtract(BitSetNode other) {
// Assert that other node intersects with this node.
assert !(other.getEnd() < this.start) && !(this.getEnd() < other.start);
int thisOffset = 0, otherOffset = 0;
if (this.start < other.start) {
thisOffset = (int) (other.start - this.start) / BITS_PER_WORD;
} else {
otherOffset = (int) (this.start - other.start) / BITS_PER_WORD;
}
int size = Math.min(this.updated.length - thisOffset, other.updated.length - otherOffset);
boolean nonZeroIntersect = false;
for (int i = 0; i < size; i++) {
long intersect = ~this.deleted[thisOffset + i] & other.updated[otherOffset + i];
if (intersect != 0) {
this.deleted[thisOffset + i] |= intersect;
nonZeroIntersect = true;
}
}
return nonZeroIntersect;
}
/**
* Convert to
* org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
@ -292,7 +272,6 @@ public class ProcedureStoreTracker {
return builder.build();
}
// ========================================================================
// Grow/Merge Helpers
// ========================================================================
@ -461,20 +440,22 @@ public class ProcedureStoreTracker {
/**
* Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
*/
public void resetTo(ProcedureStoreTracker tracker) {
public void resetTo(final ProcedureStoreTracker tracker) {
resetTo(tracker, false);
}
public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) {
this.partial = tracker.partial;
this.minUpdatedProcId = tracker.minUpdatedProcId;
this.maxUpdatedProcId = tracker.maxUpdatedProcId;
this.keepDeletes = tracker.keepDeletes;
for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
map.put(entry.getKey(), new BitSetNode(entry.getValue()));
map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
}
}
public void insert(long procId) {
BitSetNode node = getOrCreateNode(procId);
node.update(procId);
trackProcIds(procId);
insert(null, procId);
}
public void insert(final long[] procIds) {
@ -484,46 +465,108 @@ public class ProcedureStoreTracker {
}
public void insert(final long procId, final long[] subProcIds) {
update(procId);
BitSetNode node = null;
node = update(node, procId);
for (int i = 0; i < subProcIds.length; ++i) {
insert(subProcIds[i]);
node = insert(node, subProcIds[i]);
}
}
private BitSetNode insert(BitSetNode node, final long procId) {
if (node == null || !node.contains(procId)) {
node = getOrCreateNode(procId);
}
node.update(procId);
trackProcIds(procId);
return node;
}
public void update(long procId) {
Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
assert entry != null : "expected node to update procId=" + procId;
update(null, procId);
}
BitSetNode node = entry.getValue();
assert node.contains(procId);
private BitSetNode update(BitSetNode node, final long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to update procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
node.update(procId);
trackProcIds(procId);
return node;
}
public void delete(long procId) {
Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
assert entry != null : "expected node to delete procId=" + procId;
delete(null, procId);
}
BitSetNode node = entry.getValue();
assert node.contains(procId) : "expected procId in the node";
public void delete(final long[] procIds) {
Arrays.sort(procIds);
BitSetNode node = null;
for (int i = 0; i < procIds.length; ++i) {
node = delete(node, procIds[i]);
}
}
private BitSetNode delete(BitSetNode node, final long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to delete procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
node.delete(procId);
if (!keepDeletes && node.isEmpty()) {
// TODO: RESET if (map.size() == 1)
map.remove(entry.getKey());
map.remove(node.getStart());
}
trackProcIds(procId);
return node;
}
public void delete(long[] procIds) {
// TODO: optimize
Arrays.sort(procIds);
for (int i = 0; i < procIds.length; ++i) {
delete(procIds[i]);
@InterfaceAudience.Private
public void setDeleted(final long procId, final boolean isDeleted) {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
node.updateState(procId, isDeleted);
trackProcIds(procId);
}
public void setDeletedIfSet(final long... procId) {
BitSetNode node = null;
for (int i = 0; i < procId.length; ++i) {
node = lookupClosestNode(node, procId[i]);
if (node != null && node.isUpdated(procId[i])) {
node.delete(procId[i]);
}
}
}
public void setDeletedIfSet(final ProcedureStoreTracker tracker) {
BitSetNode trackerNode = null;
for (BitSetNode node: map.values()) {
final long minProcId = node.getStart();
final long maxProcId = node.getEnd();
for (long procId = minProcId; procId <= maxProcId; ++procId) {
if (!node.isUpdated(procId)) continue;
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) {
// the procedure was removed or updated
node.delete(procId);
}
}
}
}
/**
* lookup the node containing the specified procId.
* @param node cached node to check before doing a lookup
* @param procId the procId to lookup
* @return the node that may contains the procId or null
*/
private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
if (node != null && node.contains(procId)) return node;
final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
return entry != null ? entry.getValue() : null;
}
private void trackProcIds(long procId) {
minUpdatedProcId = Math.min(minUpdatedProcId, procId);
maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
@ -537,14 +580,6 @@ public class ProcedureStoreTracker {
return maxUpdatedProcId;
}
@InterfaceAudience.Private
public void setDeleted(final long procId, final boolean isDeleted) {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
node.updateState(procId, isDeleted);
trackProcIds(procId);
}
public void reset() {
this.keepDeletes = false;
this.partial = false;
@ -632,11 +667,6 @@ public class ProcedureStoreTracker {
return true;
}
public boolean isTracking(long minId, long maxId) {
// TODO: we can make it more precise, instead of looking just at the block
return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
}
/**
* Clears the list of updated procedure ids. This doesn't affect global list of active
* procedure ids.
@ -737,37 +767,6 @@ public class ProcedureStoreTracker {
}
}
/**
* Iterates over
* {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other}
* tracker.
* @return true if tracker changed, i.e. some procedure from {@code other} were subtracted from
* current tracker.
*/
public boolean subtract(ProcedureStoreTracker other) {
// Can not intersect partial bitmap.
assert !partial && !other.partial;
boolean nonZeroIntersect = false;
for (Map.Entry<Long, BitSetNode> currentEntry : map.entrySet()) {
BitSetNode currentBitSetNode = currentEntry.getValue();
Map.Entry<Long, BitSetNode> otherTrackerEntry = other.map.floorEntry(currentEntry.getKey());
if (otherTrackerEntry == null // No node in other map with key <= currentEntry.getKey().
// First entry in other map doesn't intersect with currentEntry.
|| otherTrackerEntry.getValue().getEnd() < currentEntry.getKey()) {
otherTrackerEntry = other.map.ceilingEntry(currentEntry.getKey());
if (otherTrackerEntry == null || !currentBitSetNode.contains(otherTrackerEntry.getKey())) {
// No node in other map intersects with currentBitSetNode's range.
continue;
}
}
do {
nonZeroIntersect |= currentEntry.getValue().subtract(otherTrackerEntry.getValue());
otherTrackerEntry = other.map.higherEntry(otherTrackerEntry.getKey());
} while (otherTrackerEntry != null && currentBitSetNode.contains(otherTrackerEntry.getKey()));
}
return nonZeroIntersect;
}
// ========================================================================
// Convert to/from Protocol Buffer.
// ========================================================================

View File

@ -101,8 +101,18 @@ public class ProcedureWALFormatReader {
private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
// private long compactionLogId;
private long maxProcId = 0;
private final ProcedureWALFormat.Loader loader;
/**
* Global tracker that will be used by the WALProcedureStore after load.
* If the last WAL was closed cleanly we already have a full tracker ready to be used.
* If the last WAL was truncated (e.g. master killed) the tracker will be empty
* and the 'partial' flag will be set. In this case on WAL replay we are going
* to rebuild the tracker.
*/
private final ProcedureStoreTracker tracker;
// private final boolean hasFastStartSupport;
/**
* If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
* re-build the list of procedures updated in that WAL because we need it for log cleaning
@ -113,13 +123,9 @@ public class ProcedureWALFormatReader {
* {@link ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}).
*/
private ProcedureStoreTracker localTracker;
private final ProcedureWALFormat.Loader loader;
/**
* Global tracker. If set to partial, it will be updated as procedures are loaded from wals,
* otherwise not.
*/
private final ProcedureStoreTracker tracker;
// private final boolean hasFastStartSupport;
// private long compactionLogId;
private long maxProcId = 0;
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
ProcedureWALFormat.Loader loader) {

View File

@ -72,6 +72,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
void recoverFileLease(FileSystem fs, Path path) throws IOException;
}
public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
"hbase.procedure.store.wal.warn.threshold";
private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 64;
public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
"hbase.procedure.store.wal.exec.cleanup.on.load";
private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true;
public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
"hbase.procedure.store.wal.max.retries.before.roll";
private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
@ -106,6 +114,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private static final int DEFAULT_SYNC_STATS_COUNT = 10;
private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker();
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
@ -132,6 +141,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private Thread syncThread;
private ByteSlot[] slots;
private int walCountWarnThreshold;
private int maxRetriesBeforeRoll;
private int maxSyncFailureRoll;
private int waitBeforeRoll;
@ -195,6 +205,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
// Tunings
walCountWarnThreshold =
conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD);
maxRetriesBeforeRoll =
conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
@ -257,6 +269,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
log.close();
}
logs.clear();
loading.set(true);
}
private void sendStopSignal() {
@ -335,24 +348,25 @@ public class WALProcedureStore extends ProcedureStoreBase {
@Override
public void load(final ProcedureLoader loader) throws IOException {
if (logs.isEmpty()) {
throw new RuntimeException("recoverLease() must be called before loading data");
}
// Nothing to do, If we have only the current log.
if (logs.size() == 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("No state logs to replay.");
}
loader.setMaxProcId(0);
loading.set(false);
return;
}
// Load the old logs
Iterator<ProcedureWALFile> it = logs.descendingIterator();
it.next(); // Skip the current log
lock.lock();
try {
if (logs.isEmpty()) {
throw new RuntimeException("recoverLease() must be called before loading data");
}
// Nothing to do, If we have only the current log.
if (logs.size() == 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("No state logs to replay.");
}
loader.setMaxProcId(0);
return;
}
// Load the old logs
final Iterator<ProcedureWALFile> it = logs.descendingIterator();
it.next(); // Skip the current log
ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
@Override
public void setMaxProcId(long maxProcId) {
@ -379,7 +393,32 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
});
} finally {
loading.set(false);
try {
// try to cleanup inactive wals and complete the operation
buildHoldingCleanupTracker();
tryCleanupLogsOnLoad();
loading.set(false);
} finally {
lock.unlock();
}
}
}
private void tryCleanupLogsOnLoad() {
// nothing to cleanup.
if (logs.size() <= 1) return;
// the config says to not cleanup wals on load.
if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY,
DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) {
LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs());
return;
}
try {
periodicRoll();
} catch (IOException e) {
LOG.warn("unable to cleanup logs on load: " + e.getMessage(), e);
}
}
@ -634,16 +673,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
storeTracker.insert(subProcIds);
} else {
storeTracker.insert(procId, subProcIds);
holdingCleanupTracker.setDeletedIfSet(procId);
}
break;
case UPDATE:
storeTracker.update(procId);
holdingCleanupTracker.setDeletedIfSet(procId);
break;
case DELETE:
if (subProcIds != null && subProcIds.length > 0) {
storeTracker.delete(subProcIds);
holdingCleanupTracker.setDeletedIfSet(subProcIds);
} else {
storeTracker.delete(procId);
holdingCleanupTracker.setDeletedIfSet(procId);
}
break;
default:
@ -948,6 +991,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
lastRollTs.set(rollTs);
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
// if it's the first next WAL being added, build the holding cleanup tracker
if (logs.size() == 2) {
buildHoldingCleanupTracker();
} else if (logs.size() > walCountWarnThreshold) {
LOG.warn("procedure WALs count=" + logs.size() +
" above the warning threshold " + walCountWarnThreshold +
". check running procedures to see if something is stuck.");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Roll new state log: " + logId);
}
@ -976,38 +1028,33 @@ public class WALProcedureStore extends ProcedureStoreBase {
// ==========================================================================
// Log Files cleaner helpers
// ==========================================================================
/**
* Iterates over log files from latest (ignoring currently active one) to oldest, deleting the
* ones which don't contain anything useful for recovery.
* @throws IOException
*/
private void removeInactiveLogs() throws IOException {
// TODO: can we somehow avoid first iteration (starting from newest log) and still figure out
// efficient way to cleanup old logs.
// Alternatively, a complex and maybe more efficient method would be using this iteration to
// rewrite latest states of active procedures to a new log file and delete all old ones.
if (logs.size() <= 1) return;
ProcedureStoreTracker runningTracker = new ProcedureStoreTracker();
runningTracker.resetTo(storeTracker);
List<ProcedureWALFile> logsToBeDeleted = new ArrayList<>();
for (int i = logs.size() - 2; i >= 0; i--) {
ProcedureWALFile log = logs.get(i);
// If nothing was subtracted, delete the log file since it doesn't contain any useful proc
// states.
if (!runningTracker.subtract(log.getTracker())) {
logsToBeDeleted.add(log);
}
// We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
// once there is nothing olding the oldest WAL we can remove it.
while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
removeLogFile(logs.getFirst());
buildHoldingCleanupTracker();
}
// Delete the logs from oldest to newest and stop at first log that can't be deleted to avoid
// holes in the log file sequence (for better debug capability).
while (true) {
ProcedureWALFile log = logs.getFirst();
if (logsToBeDeleted.contains(log)) {
removeLogFile(log);
} else {
break;
}
// TODO: In case we are holding up a lot of logs for long time we should
// rewrite old procedures (in theory parent procs) to the new WAL.
}
private void buildHoldingCleanupTracker() {
if (logs.size() <= 1) {
// we only have one wal, so nothing to do
holdingCleanupTracker.reset();
return;
}
// compute the holding tracker.
// - the first WAL is used for the 'updates'
// - the other WALs are scanned to remove procs already in other wals.
// TODO: exit early if holdingCleanupTracker.isEmpty()
holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
holdingCleanupTracker.setDeletedIfSet(storeTracker);
for (int i = 1, size = logs.size() - 1; i < size; ++i) {
holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker());
}
}
@ -1020,12 +1067,19 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove all state logs with ID less than " + lastLogId);
}
boolean removed = false;
while (logs.size() > 1) {
ProcedureWALFile log = logs.getFirst();
if (lastLogId < log.getLogId()) {
break;
}
removeLogFile(log);
removed = true;
}
if (removed) {
buildHoldingCleanupTracker();
}
}

View File

@ -105,32 +105,6 @@ public class TestProcedureStoreTracker {
assertTrue(tracker.isEmpty());
}
@Test
public void testIsTracking() {
long[][] procIds = new long[][] {{4, 7}, {1024, 1027}, {8192, 8194}};
long[][] checkIds = new long[][] {{2, 8}, {1023, 1025}, {8193, 8191}};
ProcedureStoreTracker tracker = new ProcedureStoreTracker();
for (int i = 0; i < procIds.length; ++i) {
long[] seq = procIds[i];
tracker.insert(seq[0]);
tracker.insert(seq[1]);
}
for (int i = 0; i < procIds.length; ++i) {
long[] check = checkIds[i];
long[] seq = procIds[i];
assertTrue(tracker.isTracking(seq[0], seq[1]));
assertTrue(tracker.isTracking(check[0], check[1]));
tracker.delete(seq[0]);
tracker.delete(seq[1]);
assertFalse(tracker.isTracking(seq[0], seq[1]));
assertFalse(tracker.isTracking(check[0], check[1]));
}
assertTrue(tracker.isEmpty());
}
@Test
public void testBasicCRUD() {
ProcedureStoreTracker tracker = new ProcedureStoreTracker();
@ -287,64 +261,31 @@ public class TestProcedureStoreTracker {
}
@Test
public void testBitSetNodeSubtract() {
// 1 not updated in n2, nothing to subtract
BitSetNode n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ });
BitSetNode n2 = buildBitSetNode(new long[]{ 1L }, new long[]{}, new long[]{});
assertFalse(n1.subtract(n2));
public void testSetDeletedIfSet() {
final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
final long[] procIds = new long[] { 1, 3, 7, 152, 512, 1024, 1025 };
// 1 updated in n2, and not deleted in n1, should subtract.
n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
assertTrue(n1.subtract(n2));
// test single proc
for (int i = 0; i < procIds.length; ++i) {
tracker.insert(procIds[i]);
}
assertEquals(false, tracker.isEmpty());
// 1 updated in n2, but deleted in n1, should not subtract
n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L });
n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
assertFalse(n1.subtract(n2));
for (int i = 0; i < procIds.length; ++i) {
tracker.setDeletedIfSet(procIds[i] - 1);
tracker.setDeletedIfSet(procIds[i]);
tracker.setDeletedIfSet(procIds[i] + 1);
}
assertEquals(true, tracker.isEmpty());
// 1 updated in n2, but not deleted in n1, should subtract.
n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L });
assertTrue(n1.subtract(n2));
// test batch
tracker.reset();
for (int i = 0; i < procIds.length; ++i) {
tracker.insert(procIds[i]);
}
assertEquals(false, tracker.isEmpty());
// all four cases together.
n1 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L }, new long[]{ 0L, 10L, 20L, 30L },
new long[]{ 20L });
n2 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L }, new long[]{ 0L, 20L, 30L },
new long[]{ 0L });
assertTrue(n1.subtract(n2));
}
@Test
// The structure is same as testBitSetNodeSubtract() but the ids are bigger so that internally
// there are many BitSetNodes.
public void testTrackerSubtract() {
// not updated in n2, nothing to subtract
ProcedureStoreTracker n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L },
new long[]{ });
ProcedureStoreTracker n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{}, new long[]{});
assertFalse(n1.subtract(n2));
// updated in n2, and not deleted in n1, should subtract.
n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
assertTrue(n1.subtract(n2));
// updated in n2, but also deleted in n1, should not subtract
n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L });
n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{});
assertFalse(n1.subtract(n2));
// updated in n2, but not deleted in n1, should subtract.
n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{ 1L, 1000L });
assertFalse(n1.subtract(n2));
n1 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 100L, 200L, 300L },
new long[]{ 200L });
n2 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 200L, 300L },
new long[]{ 0L });
assertTrue(n1.subtract(n2));
tracker.setDeletedIfSet(procIds);
assertEquals(true, tracker.isEmpty());
}
}

View File

@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@ -72,6 +73,10 @@ public class TestWALProcedureStore {
private Path testDir;
private Path logDir;
private void setupConfig(final Configuration conf) {
conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
}
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
@ -79,6 +84,7 @@ public class TestWALProcedureStore {
fs = testDir.getFileSystem(htu.getConfiguration());
assertTrue(testDir.depth() > 1);
setupConfig(htu.getConfiguration());
logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
procStore.start(PROCEDURE_STORE_SLOTS);
@ -101,6 +107,19 @@ public class TestWALProcedureStore {
for (int i = 0; i < 10; ++i) {
procStore.periodicRollForTesting();
}
assertEquals(1, procStore.getActiveLogs().size());
FileStatus[] status = fs.listStatus(logDir);
assertEquals(1, status.length);
}
@Test
public void testRestartWithoutData() throws Exception {
for (int i = 0; i < 10; ++i) {
final LoadCounter loader = new LoadCounter();
storeRestart(loader);
}
LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
assertEquals(1, procStore.getActiveLogs().size());
FileStatus[] status = fs.listStatus(logDir);
assertEquals(1, status.length);
}
@ -126,13 +145,13 @@ public class TestWALProcedureStore {
@Test
public void testWalCleanerSequentialClean() throws Exception {
int NUM = 5;
List<Procedure> procs = new ArrayList<>();
final Procedure[] procs = new Procedure[5];
ArrayList<ProcedureWALFile> logs = null;
// Insert procedures and roll wal after every insert.
for (int i = 0; i < NUM; i++) {
procs.add(new TestSequentialProcedure());
procStore.insert(procs.get(i), null);
for (int i = 0; i < procs.length; i++) {
procs[i] = new TestSequentialProcedure();
procStore.insert(procs[i], null);
procStore.rollWriterForTesting();
logs = procStore.getActiveLogs();
assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal.
@ -140,12 +159,13 @@ public class TestWALProcedureStore {
// Delete procedures in sequential order make sure that only the corresponding wal is deleted
// from logs list.
int[] deleteOrder = new int[]{ 0, 1, 2, 3, 4};
final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
for (int i = 0; i < deleteOrder.length; i++) {
procStore.delete(procs.get(deleteOrder[i]).getProcId());
procStore.delete(procs[deleteOrder[i]].getProcId());
procStore.removeInactiveLogsForTesting();
assertFalse(procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
assertEquals(procStore.getActiveLogs().size(), NUM - i );
assertFalse(logs.get(deleteOrder[i]).toString(),
procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
assertEquals(procStore.getActiveLogs().size(), procs.length - i);
}
}
@ -154,30 +174,29 @@ public class TestWALProcedureStore {
// they are in the starting of the list.
@Test
public void testWalCleanerNoHoles() throws Exception {
int NUM = 5;
List<Procedure> procs = new ArrayList<>();
final Procedure[] procs = new Procedure[5];
ArrayList<ProcedureWALFile> logs = null;
// Insert procedures and roll wal after every insert.
for (int i = 0; i < NUM; i++) {
procs.add(new TestSequentialProcedure());
procStore.insert(procs.get(i), null);
for (int i = 0; i < procs.length; i++) {
procs[i] = new TestSequentialProcedure();
procStore.insert(procs[i], null);
procStore.rollWriterForTesting();
logs = procStore.getActiveLogs();
assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal.
assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal.
}
for (int i = 1; i < NUM; i++) {
procStore.delete(procs.get(i).getProcId());
for (int i = 1; i < procs.length; i++) {
procStore.delete(procs[i].getProcId());
}
assertEquals(procStore.getActiveLogs().size(), NUM + 1);
procStore.delete(procs.get(0).getProcId());
assertEquals(procStore.getActiveLogs().size(), 1);
assertEquals(procs.length + 1, procStore.getActiveLogs().size());
procStore.delete(procs[0].getProcId());
assertEquals(1, procStore.getActiveLogs().size());
}
@Test
public void testWalCleanerUpdates() throws Exception {
TestSequentialProcedure p1 = new TestSequentialProcedure(),
p2 = new TestSequentialProcedure();
TestSequentialProcedure p1 = new TestSequentialProcedure();
TestSequentialProcedure p2 = new TestSequentialProcedure();
procStore.insert(p1, null);
procStore.insert(p2, null);
procStore.rollWriterForTesting();
@ -192,8 +211,8 @@ public class TestWALProcedureStore {
@Test
public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
TestSequentialProcedure p1 = new TestSequentialProcedure(),
p2 = new TestSequentialProcedure();
TestSequentialProcedure p1 = new TestSequentialProcedure();
TestSequentialProcedure p2 = new TestSequentialProcedure();
procStore.insert(p1, null);
procStore.insert(p2, null);
procStore.rollWriterForTesting(); // generates first log with p1 + p2
@ -213,6 +232,36 @@ public class TestWALProcedureStore {
assertFalse(procStore.getActiveLogs().contains(log2));
}
@Test
public void testWalCleanerWithEmptyRolls() throws Exception {
final Procedure[] procs = new Procedure[3];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new TestSequentialProcedure();
procStore.insert(procs[i], null);
}
assertEquals(1, procStore.getActiveLogs().size());
procStore.rollWriterForTesting();
assertEquals(2, procStore.getActiveLogs().size());
procStore.rollWriterForTesting();
assertEquals(3, procStore.getActiveLogs().size());
for (int i = 0; i < procs.length; ++i) {
procStore.update(procs[i]);
procStore.rollWriterForTesting();
procStore.rollWriterForTesting();
if (i < (procs.length - 1)) {
assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
}
}
assertEquals(7, procStore.getActiveLogs().size());
for (int i = 0; i < procs.length; ++i) {
procStore.delete(procs[i].getProcId());
assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
}
assertEquals(1, procStore.getActiveLogs().size());
}
@Test
public void testEmptyLogLoad() throws Exception {
LoadCounter loader = new LoadCounter();
@ -294,6 +343,8 @@ public class TestWALProcedureStore {
}
// Test Load 1
// Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(1, loader.getLoadedCount());
@ -360,8 +411,8 @@ public class TestWALProcedureStore {
assertEquals(0, loader.getCorruptedCount());
}
void assertUpdated(final ProcedureStoreTracker tracker, Procedure[] procs,
int[] updatedProcs, int[] nonUpdatedProcs) {
private static void assertUpdated(final ProcedureStoreTracker tracker,
final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
for (int index : updatedProcs) {
long procId = procs[index].getProcId();
assertTrue("Procedure id : " + procId, tracker.isUpdated(procId));
@ -372,8 +423,8 @@ public class TestWALProcedureStore {
}
}
void assertDeleted(final ProcedureStoreTracker tracker, Procedure[] procs,
int[] deletedProcs, int[] nonDeletedProcs) {
private static void assertDeleted(final ProcedureStoreTracker tracker,
final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
for (int index : deletedProcs) {
long procId = procs[index].getProcId();
assertEquals("Procedure id : " + procId,
@ -423,7 +474,8 @@ public class TestWALProcedureStore {
corruptLog(logs[i], 4);
}
// Restart the store
// Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
final LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5
@ -431,6 +483,7 @@ public class TestWALProcedureStore {
// Check the Trackers
final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
LOG.info("WALs " + walFiles);
assertEquals(4, walFiles.size());
LOG.info("Checking wal " + walFiles.get(0));
assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5});
@ -660,7 +713,7 @@ public class TestWALProcedureStore {
@Test
public void testFileNotFoundDuringLeaseRecovery() throws IOException {
TestProcedure[] procs = new TestProcedure[3];
final TestProcedure[] procs = new TestProcedure[3];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new TestProcedure(i + 1, 0);
procStore.insert(procs[i], null);
@ -673,7 +726,7 @@ public class TestWALProcedureStore {
procStore.stop(false);
FileStatus[] status = fs.listStatus(logDir);
assertEquals(procs.length + 2, status.length);
assertEquals(procs.length + 1, status.length);
// simulate another active master removing the wals
procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
@ -696,7 +749,7 @@ public class TestWALProcedureStore {
procStore.recoverLease();
procStore.load(loader);
assertEquals(procs.length, loader.getMaxProcId());
assertEquals(procs.length - 1, loader.getRunnableCount());
assertEquals(1, loader.getRunnableCount());
assertEquals(0, loader.getCompletedCount());
assertEquals(0, loader.getCorruptedCount());
}