diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 7ba72f2554a..08997674117 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -156,11 +156,18 @@ public class ProcedureStoreTracker { partial = false; } - public BitSetNode(BitSetNode other) { + public BitSetNode(final BitSetNode other, final boolean resetDelete) { this.start = other.start; this.partial = other.partial; this.updated = other.updated.clone(); - this.deleted = other.deleted.clone(); + if (resetDelete) { + this.deleted = new long[other.deleted.length]; + for (int i = 0; i < this.deleted.length; ++i) { + this.deleted[i] = ~(other.updated[i]); + } + } else { + this.deleted = other.deleted.clone(); + } } public void update(final long procId) { @@ -171,11 +178,11 @@ public class ProcedureStoreTracker { updateState(procId, true); } - public Long getStart() { + public long getStart() { return start; } - public Long getEnd() { + public long getEnd() { return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; } @@ -249,33 +256,6 @@ public class ProcedureStoreTracker { } } - /** - * If an active (non-deleted) procedure in current BitSetNode has been updated in {@code other} - * BitSetNode, then delete it from current node. - * @return true if node changed, i.e. some procedure(s) from {@code other} was subtracted from - * current node. - */ - public boolean subtract(BitSetNode other) { - // Assert that other node intersects with this node. - assert !(other.getEnd() < this.start) && !(this.getEnd() < other.start); - int thisOffset = 0, otherOffset = 0; - if (this.start < other.start) { - thisOffset = (int) (other.start - this.start) / BITS_PER_WORD; - } else { - otherOffset = (int) (this.start - other.start) / BITS_PER_WORD; - } - int size = Math.min(this.updated.length - thisOffset, other.updated.length - otherOffset); - boolean nonZeroIntersect = false; - for (int i = 0; i < size; i++) { - long intersect = ~this.deleted[thisOffset + i] & other.updated[otherOffset + i]; - if (intersect != 0) { - this.deleted[thisOffset + i] |= intersect; - nonZeroIntersect = true; - } - } - return nonZeroIntersect; - } - /** * Convert to * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode @@ -292,7 +272,6 @@ public class ProcedureStoreTracker { return builder.build(); } - // ======================================================================== // Grow/Merge Helpers // ======================================================================== @@ -461,20 +440,22 @@ public class ProcedureStoreTracker { /** * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. */ - public void resetTo(ProcedureStoreTracker tracker) { + public void resetTo(final ProcedureStoreTracker tracker) { + resetTo(tracker, false); + } + + public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) { this.partial = tracker.partial; this.minUpdatedProcId = tracker.minUpdatedProcId; this.maxUpdatedProcId = tracker.maxUpdatedProcId; this.keepDeletes = tracker.keepDeletes; for (Map.Entry entry : tracker.map.entrySet()) { - map.put(entry.getKey(), new BitSetNode(entry.getValue())); + map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); } } public void insert(long procId) { - BitSetNode node = getOrCreateNode(procId); - node.update(procId); - trackProcIds(procId); + insert(null, procId); } public void insert(final long[] procIds) { @@ -484,46 +465,108 @@ public class ProcedureStoreTracker { } public void insert(final long procId, final long[] subProcIds) { - update(procId); + BitSetNode node = null; + node = update(node, procId); for (int i = 0; i < subProcIds.length; ++i) { - insert(subProcIds[i]); + node = insert(node, subProcIds[i]); } } + private BitSetNode insert(BitSetNode node, final long procId) { + if (node == null || !node.contains(procId)) { + node = getOrCreateNode(procId); + } + node.update(procId); + trackProcIds(procId); + return node; + } + public void update(long procId) { - Map.Entry entry = map.floorEntry(procId); - assert entry != null : "expected node to update procId=" + procId; + update(null, procId); + } - BitSetNode node = entry.getValue(); - assert node.contains(procId); + private BitSetNode update(BitSetNode node, final long procId) { + node = lookupClosestNode(node, procId); + assert node != null : "expected node to update procId=" + procId; + assert node.contains(procId) : "expected procId=" + procId + " in the node"; node.update(procId); trackProcIds(procId); + return node; } public void delete(long procId) { - Map.Entry entry = map.floorEntry(procId); - assert entry != null : "expected node to delete procId=" + procId; + delete(null, procId); + } - BitSetNode node = entry.getValue(); - assert node.contains(procId) : "expected procId in the node"; + public void delete(final long[] procIds) { + Arrays.sort(procIds); + BitSetNode node = null; + for (int i = 0; i < procIds.length; ++i) { + node = delete(node, procIds[i]); + } + } + + private BitSetNode delete(BitSetNode node, final long procId) { + node = lookupClosestNode(node, procId); + assert node != null : "expected node to delete procId=" + procId; + assert node.contains(procId) : "expected procId=" + procId + " in the node"; node.delete(procId); - if (!keepDeletes && node.isEmpty()) { // TODO: RESET if (map.size() == 1) - map.remove(entry.getKey()); + map.remove(node.getStart()); } trackProcIds(procId); + return node; } - public void delete(long[] procIds) { - // TODO: optimize - Arrays.sort(procIds); - for (int i = 0; i < procIds.length; ++i) { - delete(procIds[i]); + @InterfaceAudience.Private + public void setDeleted(final long procId, final boolean isDeleted) { + BitSetNode node = getOrCreateNode(procId); + assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; + node.updateState(procId, isDeleted); + trackProcIds(procId); + } + + public void setDeletedIfSet(final long... procId) { + BitSetNode node = null; + for (int i = 0; i < procId.length; ++i) { + node = lookupClosestNode(node, procId[i]); + if (node != null && node.isUpdated(procId[i])) { + node.delete(procId[i]); + } } } + public void setDeletedIfSet(final ProcedureStoreTracker tracker) { + BitSetNode trackerNode = null; + for (BitSetNode node: map.values()) { + final long minProcId = node.getStart(); + final long maxProcId = node.getEnd(); + for (long procId = minProcId; procId <= maxProcId; ++procId) { + if (!node.isUpdated(procId)) continue; + + trackerNode = tracker.lookupClosestNode(trackerNode, procId); + if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) { + // the procedure was removed or updated + node.delete(procId); + } + } + } + } + + /** + * lookup the node containing the specified procId. + * @param node cached node to check before doing a lookup + * @param procId the procId to lookup + * @return the node that may contains the procId or null + */ + private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) { + if (node != null && node.contains(procId)) return node; + final Map.Entry entry = map.floorEntry(procId); + return entry != null ? entry.getValue() : null; + } + private void trackProcIds(long procId) { minUpdatedProcId = Math.min(minUpdatedProcId, procId); maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); @@ -537,14 +580,6 @@ public class ProcedureStoreTracker { return maxUpdatedProcId; } - @InterfaceAudience.Private - public void setDeleted(final long procId, final boolean isDeleted) { - BitSetNode node = getOrCreateNode(procId); - assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; - node.updateState(procId, isDeleted); - trackProcIds(procId); - } - public void reset() { this.keepDeletes = false; this.partial = false; @@ -632,11 +667,6 @@ public class ProcedureStoreTracker { return true; } - public boolean isTracking(long minId, long maxId) { - // TODO: we can make it more precise, instead of looking just at the block - return map.floorEntry(minId) != null || map.floorEntry(maxId) != null; - } - /** * Clears the list of updated procedure ids. This doesn't affect global list of active * procedure ids. @@ -737,37 +767,6 @@ public class ProcedureStoreTracker { } } - /** - * Iterates over - * {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other} - * tracker. - * @return true if tracker changed, i.e. some procedure from {@code other} were subtracted from - * current tracker. - */ - public boolean subtract(ProcedureStoreTracker other) { - // Can not intersect partial bitmap. - assert !partial && !other.partial; - boolean nonZeroIntersect = false; - for (Map.Entry currentEntry : map.entrySet()) { - BitSetNode currentBitSetNode = currentEntry.getValue(); - Map.Entry otherTrackerEntry = other.map.floorEntry(currentEntry.getKey()); - if (otherTrackerEntry == null // No node in other map with key <= currentEntry.getKey(). - // First entry in other map doesn't intersect with currentEntry. - || otherTrackerEntry.getValue().getEnd() < currentEntry.getKey()) { - otherTrackerEntry = other.map.ceilingEntry(currentEntry.getKey()); - if (otherTrackerEntry == null || !currentBitSetNode.contains(otherTrackerEntry.getKey())) { - // No node in other map intersects with currentBitSetNode's range. - continue; - } - } - do { - nonZeroIntersect |= currentEntry.getValue().subtract(otherTrackerEntry.getValue()); - otherTrackerEntry = other.map.higherEntry(otherTrackerEntry.getKey()); - } while (otherTrackerEntry != null && currentBitSetNode.contains(otherTrackerEntry.getKey())); - } - return nonZeroIntersect; - } - // ======================================================================== // Convert to/from Protocol Buffer. // ======================================================================== diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index e5c8fcaa2bb..aeae5698e2c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -101,8 +101,18 @@ public class ProcedureWALFormatReader { private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); private final WalProcedureMap procedureMap = new WalProcedureMap(1024); - // private long compactionLogId; - private long maxProcId = 0; + private final ProcedureWALFormat.Loader loader; + + /** + * Global tracker that will be used by the WALProcedureStore after load. + * If the last WAL was closed cleanly we already have a full tracker ready to be used. + * If the last WAL was truncated (e.g. master killed) the tracker will be empty + * and the 'partial' flag will be set. In this case on WAL replay we are going + * to rebuild the tracker. + */ + private final ProcedureStoreTracker tracker; + // private final boolean hasFastStartSupport; + /** * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we * re-build the list of procedures updated in that WAL because we need it for log cleaning @@ -113,13 +123,9 @@ public class ProcedureWALFormatReader { * {@link ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}). */ private ProcedureStoreTracker localTracker; - private final ProcedureWALFormat.Loader loader; - /** - * Global tracker. If set to partial, it will be updated as procedures are loaded from wals, - * otherwise not. - */ - private final ProcedureStoreTracker tracker; - // private final boolean hasFastStartSupport; + + // private long compactionLogId; + private long maxProcId = 0; public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, ProcedureWALFormat.Loader loader) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 3884e398bad..922b681efde 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -72,6 +72,14 @@ public class WALProcedureStore extends ProcedureStoreBase { void recoverFileLease(FileSystem fs, Path path) throws IOException; } + public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = + "hbase.procedure.store.wal.warn.threshold"; + private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 64; + + public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = + "hbase.procedure.store.wal.exec.cleanup.on.load"; + private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true; + public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = "hbase.procedure.store.wal.max.retries.before.roll"; private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; @@ -106,6 +114,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private static final int DEFAULT_SYNC_STATS_COUNT = 10; private final LinkedList logs = new LinkedList<>(); + private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); @@ -132,6 +141,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private Thread syncThread; private ByteSlot[] slots; + private int walCountWarnThreshold; private int maxRetriesBeforeRoll; private int maxSyncFailureRoll; private int waitBeforeRoll; @@ -195,6 +205,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Tunings + walCountWarnThreshold = + conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD); maxRetriesBeforeRoll = conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); @@ -257,6 +269,7 @@ public class WALProcedureStore extends ProcedureStoreBase { log.close(); } logs.clear(); + loading.set(true); } private void sendStopSignal() { @@ -335,24 +348,25 @@ public class WALProcedureStore extends ProcedureStoreBase { @Override public void load(final ProcedureLoader loader) throws IOException { - if (logs.isEmpty()) { - throw new RuntimeException("recoverLease() must be called before loading data"); - } - - // Nothing to do, If we have only the current log. - if (logs.size() == 1) { - if (LOG.isDebugEnabled()) { - LOG.debug("No state logs to replay."); - } - loader.setMaxProcId(0); - loading.set(false); - return; - } - - // Load the old logs - Iterator it = logs.descendingIterator(); - it.next(); // Skip the current log + lock.lock(); try { + if (logs.isEmpty()) { + throw new RuntimeException("recoverLease() must be called before loading data"); + } + + // Nothing to do, If we have only the current log. + if (logs.size() == 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("No state logs to replay."); + } + loader.setMaxProcId(0); + return; + } + + // Load the old logs + final Iterator it = logs.descendingIterator(); + it.next(); // Skip the current log + ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { @Override public void setMaxProcId(long maxProcId) { @@ -379,7 +393,32 @@ public class WALProcedureStore extends ProcedureStoreBase { } }); } finally { - loading.set(false); + try { + // try to cleanup inactive wals and complete the operation + buildHoldingCleanupTracker(); + tryCleanupLogsOnLoad(); + loading.set(false); + } finally { + lock.unlock(); + } + } + } + + private void tryCleanupLogsOnLoad() { + // nothing to cleanup. + if (logs.size() <= 1) return; + + // the config says to not cleanup wals on load. + if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, + DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) { + LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs()); + return; + } + + try { + periodicRoll(); + } catch (IOException e) { + LOG.warn("unable to cleanup logs on load: " + e.getMessage(), e); } } @@ -634,16 +673,20 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.insert(subProcIds); } else { storeTracker.insert(procId, subProcIds); + holdingCleanupTracker.setDeletedIfSet(procId); } break; case UPDATE: storeTracker.update(procId); + holdingCleanupTracker.setDeletedIfSet(procId); break; case DELETE: if (subProcIds != null && subProcIds.length > 0) { storeTracker.delete(subProcIds); + holdingCleanupTracker.setDeletedIfSet(subProcIds); } else { storeTracker.delete(procId); + holdingCleanupTracker.setDeletedIfSet(procId); } break; default: @@ -948,6 +991,15 @@ public class WALProcedureStore extends ProcedureStoreBase { lastRollTs.set(rollTs); logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); + // if it's the first next WAL being added, build the holding cleanup tracker + if (logs.size() == 2) { + buildHoldingCleanupTracker(); + } else if (logs.size() > walCountWarnThreshold) { + LOG.warn("procedure WALs count=" + logs.size() + + " above the warning threshold " + walCountWarnThreshold + + ". check running procedures to see if something is stuck."); + } + if (LOG.isDebugEnabled()) { LOG.debug("Roll new state log: " + logId); } @@ -976,38 +1028,33 @@ public class WALProcedureStore extends ProcedureStoreBase { // ========================================================================== // Log Files cleaner helpers // ========================================================================== - - /** - * Iterates over log files from latest (ignoring currently active one) to oldest, deleting the - * ones which don't contain anything useful for recovery. - * @throws IOException - */ private void removeInactiveLogs() throws IOException { - // TODO: can we somehow avoid first iteration (starting from newest log) and still figure out - // efficient way to cleanup old logs. - // Alternatively, a complex and maybe more efficient method would be using this iteration to - // rewrite latest states of active procedures to a new log file and delete all old ones. - if (logs.size() <= 1) return; - ProcedureStoreTracker runningTracker = new ProcedureStoreTracker(); - runningTracker.resetTo(storeTracker); - List logsToBeDeleted = new ArrayList<>(); - for (int i = logs.size() - 2; i >= 0; i--) { - ProcedureWALFile log = logs.get(i); - // If nothing was subtracted, delete the log file since it doesn't contain any useful proc - // states. - if (!runningTracker.subtract(log.getTracker())) { - logsToBeDeleted.add(log); - } + // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. + // once there is nothing olding the oldest WAL we can remove it. + while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { + removeLogFile(logs.getFirst()); + buildHoldingCleanupTracker(); } - // Delete the logs from oldest to newest and stop at first log that can't be deleted to avoid - // holes in the log file sequence (for better debug capability). - while (true) { - ProcedureWALFile log = logs.getFirst(); - if (logsToBeDeleted.contains(log)) { - removeLogFile(log); - } else { - break; - } + + // TODO: In case we are holding up a lot of logs for long time we should + // rewrite old procedures (in theory parent procs) to the new WAL. + } + + private void buildHoldingCleanupTracker() { + if (logs.size() <= 1) { + // we only have one wal, so nothing to do + holdingCleanupTracker.reset(); + return; + } + + // compute the holding tracker. + // - the first WAL is used for the 'updates' + // - the other WALs are scanned to remove procs already in other wals. + // TODO: exit early if holdingCleanupTracker.isEmpty() + holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); + holdingCleanupTracker.setDeletedIfSet(storeTracker); + for (int i = 1, size = logs.size() - 1; i < size; ++i) { + holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker()); } } @@ -1020,12 +1067,19 @@ public class WALProcedureStore extends ProcedureStoreBase { if (LOG.isDebugEnabled()) { LOG.debug("Remove all state logs with ID less than " + lastLogId); } + + boolean removed = false; while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); if (lastLogId < log.getLogId()) { break; } removeLogFile(log); + removed = true; + } + + if (removed) { + buildHoldingCleanupTracker(); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 76fd2c58ee1..550116e2e16 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -105,32 +105,6 @@ public class TestProcedureStoreTracker { assertTrue(tracker.isEmpty()); } - @Test - public void testIsTracking() { - long[][] procIds = new long[][] {{4, 7}, {1024, 1027}, {8192, 8194}}; - long[][] checkIds = new long[][] {{2, 8}, {1023, 1025}, {8193, 8191}}; - - ProcedureStoreTracker tracker = new ProcedureStoreTracker(); - for (int i = 0; i < procIds.length; ++i) { - long[] seq = procIds[i]; - tracker.insert(seq[0]); - tracker.insert(seq[1]); - } - - for (int i = 0; i < procIds.length; ++i) { - long[] check = checkIds[i]; - long[] seq = procIds[i]; - assertTrue(tracker.isTracking(seq[0], seq[1])); - assertTrue(tracker.isTracking(check[0], check[1])); - tracker.delete(seq[0]); - tracker.delete(seq[1]); - assertFalse(tracker.isTracking(seq[0], seq[1])); - assertFalse(tracker.isTracking(check[0], check[1])); - } - - assertTrue(tracker.isEmpty()); - } - @Test public void testBasicCRUD() { ProcedureStoreTracker tracker = new ProcedureStoreTracker(); @@ -287,64 +261,31 @@ public class TestProcedureStoreTracker { } @Test - public void testBitSetNodeSubtract() { - // 1 not updated in n2, nothing to subtract - BitSetNode n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ }); - BitSetNode n2 = buildBitSetNode(new long[]{ 1L }, new long[]{}, new long[]{}); - assertFalse(n1.subtract(n2)); + public void testSetDeletedIfSet() { + final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + final long[] procIds = new long[] { 1, 3, 7, 152, 512, 1024, 1025 }; - // 1 updated in n2, and not deleted in n1, should subtract. - n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - assertTrue(n1.subtract(n2)); + // test single proc + for (int i = 0; i < procIds.length; ++i) { + tracker.insert(procIds[i]); + } + assertEquals(false, tracker.isEmpty()); - // 1 updated in n2, but deleted in n1, should not subtract - n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L }); - n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - assertFalse(n1.subtract(n2)); + for (int i = 0; i < procIds.length; ++i) { + tracker.setDeletedIfSet(procIds[i] - 1); + tracker.setDeletedIfSet(procIds[i]); + tracker.setDeletedIfSet(procIds[i] + 1); + } + assertEquals(true, tracker.isEmpty()); - // 1 updated in n2, but not deleted in n1, should subtract. - n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L }); - assertTrue(n1.subtract(n2)); + // test batch + tracker.reset(); + for (int i = 0; i < procIds.length; ++i) { + tracker.insert(procIds[i]); + } + assertEquals(false, tracker.isEmpty()); - // all four cases together. - n1 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L }, new long[]{ 0L, 10L, 20L, 30L }, - new long[]{ 20L }); - n2 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L }, new long[]{ 0L, 20L, 30L }, - new long[]{ 0L }); - assertTrue(n1.subtract(n2)); - } - - @Test - // The structure is same as testBitSetNodeSubtract() but the ids are bigger so that internally - // there are many BitSetNodes. - public void testTrackerSubtract() { - // not updated in n2, nothing to subtract - ProcedureStoreTracker n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, - new long[]{ }); - ProcedureStoreTracker n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{}, new long[]{}); - assertFalse(n1.subtract(n2)); - - // updated in n2, and not deleted in n1, should subtract. - n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{}); - n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{}); - assertTrue(n1.subtract(n2)); - - // updated in n2, but also deleted in n1, should not subtract - n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }); - n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{}); - assertFalse(n1.subtract(n2)); - - // updated in n2, but not deleted in n1, should subtract. - n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{}); - n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{ 1L, 1000L }); - assertFalse(n1.subtract(n2)); - - n1 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 100L, 200L, 300L }, - new long[]{ 200L }); - n2 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 200L, 300L }, - new long[]{ 0L }); - assertTrue(n1.subtract(n2)); + tracker.setDeletedIfSet(procIds); + assertEquals(true, tracker.isEmpty()); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 83f481ce839..f8c34869b31 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -72,6 +73,10 @@ public class TestWALProcedureStore { private Path testDir; private Path logDir; + private void setupConfig(final Configuration conf) { + conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); + } + @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); @@ -79,6 +84,7 @@ public class TestWALProcedureStore { fs = testDir.getFileSystem(htu.getConfiguration()); assertTrue(testDir.depth() > 1); + setupConfig(htu.getConfiguration()); logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore.start(PROCEDURE_STORE_SLOTS); @@ -101,6 +107,19 @@ public class TestWALProcedureStore { for (int i = 0; i < 10; ++i) { procStore.periodicRollForTesting(); } + assertEquals(1, procStore.getActiveLogs().size()); + FileStatus[] status = fs.listStatus(logDir); + assertEquals(1, status.length); + } + + @Test + public void testRestartWithoutData() throws Exception { + for (int i = 0; i < 10; ++i) { + final LoadCounter loader = new LoadCounter(); + storeRestart(loader); + } + LOG.info("ACTIVE WALs " + procStore.getActiveLogs()); + assertEquals(1, procStore.getActiveLogs().size()); FileStatus[] status = fs.listStatus(logDir); assertEquals(1, status.length); } @@ -126,13 +145,13 @@ public class TestWALProcedureStore { @Test public void testWalCleanerSequentialClean() throws Exception { - int NUM = 5; - List procs = new ArrayList<>(); + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; + // Insert procedures and roll wal after every insert. - for (int i = 0; i < NUM; i++) { - procs.add(new TestSequentialProcedure()); - procStore.insert(procs.get(i), null); + for (int i = 0; i < procs.length; i++) { + procs[i] = new TestSequentialProcedure(); + procStore.insert(procs[i], null); procStore.rollWriterForTesting(); logs = procStore.getActiveLogs(); assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. @@ -140,12 +159,13 @@ public class TestWALProcedureStore { // Delete procedures in sequential order make sure that only the corresponding wal is deleted // from logs list. - int[] deleteOrder = new int[]{ 0, 1, 2, 3, 4}; + final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; for (int i = 0; i < deleteOrder.length; i++) { - procStore.delete(procs.get(deleteOrder[i]).getProcId()); + procStore.delete(procs[deleteOrder[i]].getProcId()); procStore.removeInactiveLogsForTesting(); - assertFalse(procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); - assertEquals(procStore.getActiveLogs().size(), NUM - i ); + assertFalse(logs.get(deleteOrder[i]).toString(), + procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); + assertEquals(procStore.getActiveLogs().size(), procs.length - i); } } @@ -154,30 +174,29 @@ public class TestWALProcedureStore { // they are in the starting of the list. @Test public void testWalCleanerNoHoles() throws Exception { - int NUM = 5; - List procs = new ArrayList<>(); + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; // Insert procedures and roll wal after every insert. - for (int i = 0; i < NUM; i++) { - procs.add(new TestSequentialProcedure()); - procStore.insert(procs.get(i), null); + for (int i = 0; i < procs.length; i++) { + procs[i] = new TestSequentialProcedure(); + procStore.insert(procs[i], null); procStore.rollWriterForTesting(); logs = procStore.getActiveLogs(); - assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. + assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal. } - for (int i = 1; i < NUM; i++) { - procStore.delete(procs.get(i).getProcId()); + for (int i = 1; i < procs.length; i++) { + procStore.delete(procs[i].getProcId()); } - assertEquals(procStore.getActiveLogs().size(), NUM + 1); - procStore.delete(procs.get(0).getProcId()); - assertEquals(procStore.getActiveLogs().size(), 1); + assertEquals(procs.length + 1, procStore.getActiveLogs().size()); + procStore.delete(procs[0].getProcId()); + assertEquals(1, procStore.getActiveLogs().size()); } @Test public void testWalCleanerUpdates() throws Exception { - TestSequentialProcedure p1 = new TestSequentialProcedure(), - p2 = new TestSequentialProcedure(); + TestSequentialProcedure p1 = new TestSequentialProcedure(); + TestSequentialProcedure p2 = new TestSequentialProcedure(); procStore.insert(p1, null); procStore.insert(p2, null); procStore.rollWriterForTesting(); @@ -192,8 +211,8 @@ public class TestWALProcedureStore { @Test public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { - TestSequentialProcedure p1 = new TestSequentialProcedure(), - p2 = new TestSequentialProcedure(); + TestSequentialProcedure p1 = new TestSequentialProcedure(); + TestSequentialProcedure p2 = new TestSequentialProcedure(); procStore.insert(p1, null); procStore.insert(p2, null); procStore.rollWriterForTesting(); // generates first log with p1 + p2 @@ -213,6 +232,36 @@ public class TestWALProcedureStore { assertFalse(procStore.getActiveLogs().contains(log2)); } + @Test + public void testWalCleanerWithEmptyRolls() throws Exception { + final Procedure[] procs = new Procedure[3]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestSequentialProcedure(); + procStore.insert(procs[i], null); + } + assertEquals(1, procStore.getActiveLogs().size()); + procStore.rollWriterForTesting(); + assertEquals(2, procStore.getActiveLogs().size()); + procStore.rollWriterForTesting(); + assertEquals(3, procStore.getActiveLogs().size()); + + for (int i = 0; i < procs.length; ++i) { + procStore.update(procs[i]); + procStore.rollWriterForTesting(); + procStore.rollWriterForTesting(); + if (i < (procs.length - 1)) { + assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size()); + } + } + assertEquals(7, procStore.getActiveLogs().size()); + + for (int i = 0; i < procs.length; ++i) { + procStore.delete(procs[i].getProcId()); + assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size()); + } + assertEquals(1, procStore.getActiveLogs().size()); + } + @Test public void testEmptyLogLoad() throws Exception { LoadCounter loader = new LoadCounter(); @@ -294,6 +343,8 @@ public class TestWALProcedureStore { } // Test Load 1 + // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) + htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); LoadCounter loader = new LoadCounter(); storeRestart(loader); assertEquals(1, loader.getLoadedCount()); @@ -360,8 +411,8 @@ public class TestWALProcedureStore { assertEquals(0, loader.getCorruptedCount()); } - void assertUpdated(final ProcedureStoreTracker tracker, Procedure[] procs, - int[] updatedProcs, int[] nonUpdatedProcs) { + private static void assertUpdated(final ProcedureStoreTracker tracker, + final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { for (int index : updatedProcs) { long procId = procs[index].getProcId(); assertTrue("Procedure id : " + procId, tracker.isUpdated(procId)); @@ -372,8 +423,8 @@ public class TestWALProcedureStore { } } - void assertDeleted(final ProcedureStoreTracker tracker, Procedure[] procs, - int[] deletedProcs, int[] nonDeletedProcs) { + private static void assertDeleted(final ProcedureStoreTracker tracker, + final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { for (int index : deletedProcs) { long procId = procs[index].getProcId(); assertEquals("Procedure id : " + procId, @@ -423,7 +474,8 @@ public class TestWALProcedureStore { corruptLog(logs[i], 4); } - // Restart the store + // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) + htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); final LoadCounter loader = new LoadCounter(); storeRestart(loader); assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 @@ -431,6 +483,7 @@ public class TestWALProcedureStore { // Check the Trackers final ArrayList walFiles = procStore.getActiveLogs(); + LOG.info("WALs " + walFiles); assertEquals(4, walFiles.size()); LOG.info("Checking wal " + walFiles.get(0)); assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5}); @@ -660,7 +713,7 @@ public class TestWALProcedureStore { @Test public void testFileNotFoundDuringLeaseRecovery() throws IOException { - TestProcedure[] procs = new TestProcedure[3]; + final TestProcedure[] procs = new TestProcedure[3]; for (int i = 0; i < procs.length; ++i) { procs[i] = new TestProcedure(i + 1, 0); procStore.insert(procs[i], null); @@ -673,7 +726,7 @@ public class TestWALProcedureStore { procStore.stop(false); FileStatus[] status = fs.listStatus(logDir); - assertEquals(procs.length + 2, status.length); + assertEquals(procs.length + 1, status.length); // simulate another active master removing the wals procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, @@ -696,7 +749,7 @@ public class TestWALProcedureStore { procStore.recoverLease(); procStore.load(loader); assertEquals(procs.length, loader.getMaxProcId()); - assertEquals(procs.length - 1, loader.getRunnableCount()); + assertEquals(1, loader.getRunnableCount()); assertEquals(0, loader.getCompletedCount()); assertEquals(0, loader.getCorruptedCount()); }