From 60d33ce34191533bb858852584bd9bddfeb16a23 Mon Sep 17 00:00:00 2001 From: Matteo Bertozzi Date: Tue, 15 Dec 2015 10:15:18 -0800 Subject: [PATCH] HBASE-14947 WALProcedureStore improvements --- .../store/ProcedureStoreTracker.java | 21 +- .../store/wal/WALProcedureStore.java | 294 +++++++++--------- .../procedure2/TestProcedureRecovery.java | 4 +- .../store/TestProcedureStoreTracker.java | 35 +-- .../store/wal/TestWALProcedureStore.java | 51 ++- 5 files changed, 203 insertions(+), 202 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 8516f612760..6823288a3cd 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -27,7 +27,6 @@ import java.util.TreeMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; /** @@ -356,25 +355,19 @@ public class ProcedureStoreTracker { } } - public void insert(final Procedure proc, final Procedure[] subprocs) { - insert(proc.getProcId()); - if (subprocs != null) { - for (int i = 0; i < subprocs.length; ++i) { - insert(subprocs[i].getProcId()); - } - } - } - - public void update(final Procedure proc) { - update(proc.getProcId()); - } - public void insert(long procId) { BitSetNode node = getOrCreateNode(procId); node.update(procId); trackProcIds(procId); } + public void insert(final long procId, final long[] subProcIds) { + update(procId); + for (int i = 0; i < subProcIds.length; ++i) { + insert(subProcIds[i]); + } + } + public void update(long procId) { Map.Entry entry = map.floorEntry(procId); assert entry != null : "expected node to update procId=" + procId; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index ec42d6a156c..20709a91142 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -100,7 +100,6 @@ public class WALProcedureStore extends ProcedureStoreBase { private final LinkedList logs = new LinkedList(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); - private final AtomicLong inactiveLogsMaxId = new AtomicLong(0); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); private final Condition slotCond = lock.newCondition(); @@ -191,19 +190,16 @@ public class WALProcedureStore extends ProcedureStoreBase { } LOG.info("Stopping the WAL Procedure Store"); - if (lock.tryLock()) { - try { - waitCond.signalAll(); - syncCond.signalAll(); - } finally { - lock.unlock(); - } - } + sendStopSignal(); if (!abort) { try { - syncThread.join(); + while (syncThread.isAlive()) { + sendStopSignal(); + syncThread.join(250); + } } catch (InterruptedException e) { + LOG.warn("join interrupted", e); Thread.currentThread().interrupt(); } } @@ -220,6 +216,17 @@ public class WALProcedureStore extends ProcedureStoreBase { logs.clear(); } + private void sendStopSignal() { + if (lock.tryLock()) { + try { + waitCond.signalAll(); + syncCond.signalAll(); + } finally { + lock.unlock(); + } + } + } + @Override public int getNumThreads() { return slots == null ? 0 : slots.length; @@ -239,31 +246,36 @@ public class WALProcedureStore extends ProcedureStoreBase { @Override public void recoverLease() throws IOException { - LOG.info("Starting WAL Procedure Store lease recovery"); - FileStatus[] oldLogs = getLogFiles(); - while (isRunning()) { - // Get Log-MaxID and recover lease on old logs - flushLogId = initOldLogs(oldLogs); + lock.lock(); + try { + LOG.info("Starting WAL Procedure Store lease recovery"); + FileStatus[] oldLogs = getLogFiles(); + while (isRunning()) { + // Get Log-MaxID and recover lease on old logs + flushLogId = initOldLogs(oldLogs); - // Create new state-log - if (!rollWriter(flushLogId + 1)) { - // someone else has already created this log - LOG.debug("someone else has already created log " + flushLogId); - continue; - } - - // We have the lease on the log - oldLogs = getLogFiles(); - if (getMaxLogId(oldLogs) > flushLogId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); + // Create new state-log + if (!rollWriter(flushLogId + 1)) { + // someone else has already created this log + LOG.debug("someone else has already created log " + flushLogId); + continue; } - logs.getLast().removeFile(); - continue; - } - LOG.info("Lease acquired for flushLogId: " + flushLogId); - break; + // We have the lease on the log + oldLogs = getLogFiles(); + if (getMaxLogId(oldLogs) > flushLogId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); + } + logs.getLast().removeFile(); + continue; + } + + LOG.info("Lease acquired for flushLogId: " + flushLogId); + break; + } + } finally { + lock.unlock(); } } @@ -335,18 +347,22 @@ public class WALProcedureStore extends ProcedureStoreBase { } ByteSlot slot = acquireSlot(); - long logId = -1; try { // Serialize the insert + long[] subProcIds = null; if (subprocs != null) { ProcedureWALFormat.writeInsert(slot, proc, subprocs); + subProcIds = new long[subprocs.length]; + for (int i = 0; i < subprocs.length; ++i) { + subProcIds[i] = subprocs[i].getProcId(); + } } else { assert !proc.hasParent(); ProcedureWALFormat.writeInsert(slot, proc); } // Push the transaction data and wait until it is persisted - pushData(slot); + pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -356,14 +372,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } finally { releaseSlot(slot); } - - // Update the store tracker - synchronized (storeTracker) { - storeTracker.insert(proc, subprocs); - if (logId == flushLogId) { - checkAndTryRoll(); - } - } } @Override @@ -373,13 +381,12 @@ public class WALProcedureStore extends ProcedureStoreBase { } ByteSlot slot = acquireSlot(); - long logId = -1; try { // Serialize the update ProcedureWALFormat.writeUpdate(slot, proc); // Push the transaction data and wait until it is persisted - logId = pushData(slot); + pushData(PushType.UPDATE, slot, proc.getProcId(), null); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -388,20 +395,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } finally { releaseSlot(slot); } - - // Update the store tracker - boolean removeOldLogs = false; - synchronized (storeTracker) { - storeTracker.update(proc); - if (logId == flushLogId) { - removeOldLogs = storeTracker.isUpdated(); - checkAndTryRoll(); - } - } - - if (removeOldLogs) { - setInactiveLogsMaxId(logId - 1); - } } @Override @@ -411,13 +404,12 @@ public class WALProcedureStore extends ProcedureStoreBase { } ByteSlot slot = acquireSlot(); - long logId = -1; try { // Serialize the delete ProcedureWALFormat.writeDelete(slot, procId); // Push the transaction data and wait until it is persisted - logId = pushData(slot); + pushData(PushType.DELETE, slot, procId, null); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -426,22 +418,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } finally { releaseSlot(slot); } - - boolean removeOldLogs = false; - synchronized (storeTracker) { - storeTracker.delete(procId); - if (logId == flushLogId) { - if (storeTracker.isEmpty() || storeTracker.isUpdated()) { - removeOldLogs = checkAndTryRoll(); - } else { - checkAndTryRoll(); - } - } - } - - if (removeOldLogs) { - setInactiveLogsMaxId(logId); - } } private ByteSlot acquireSlot() { @@ -454,7 +430,10 @@ public class WALProcedureStore extends ProcedureStoreBase { slotsCache.offer(slot); } - private long pushData(final ByteSlot slot) { + private enum PushType { INSERT, UPDATE, DELETE }; + + private long pushData(final PushType type, final ByteSlot slot, + final long procId, final long[] subProcIds) { if (!isRunning()) { throw new RuntimeException("the store must be running before inserting data"); } @@ -481,6 +460,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } + updateStoreTracker(type, procId, subProcIds); slots[slotIndex++] = slot; logId = flushLogId; @@ -509,20 +489,29 @@ public class WALProcedureStore extends ProcedureStoreBase { return logId; } - private boolean isSyncAborted() { - return syncException.get() != null; + private void updateStoreTracker(final PushType type, + final long procId, final long[] subProcIds) { + switch (type) { + case INSERT: + if (subProcIds == null) { + storeTracker.insert(procId); + } else { + storeTracker.insert(procId, subProcIds); + } + break; + case UPDATE: + storeTracker.update(procId); + break; + case DELETE: + storeTracker.delete(procId); + break; + default: + throw new RuntimeException("invalid push type " + type); + } } - protected void periodicRoll() throws IOException { - long logId; - boolean removeOldLogs; - synchronized (storeTracker) { - logId = flushLogId; - removeOldLogs = storeTracker.isEmpty(); - } - if (checkAndTryRoll() && removeOldLogs) { - setInactiveLogsMaxId(logId); - } + private boolean isSyncAborted() { + return syncException.get() != null; } private void syncLoop() throws Throwable { @@ -534,7 +523,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // Wait until new data is available if (slotIndex == 0) { if (!loading.get()) { - removeInactiveLogs(); + periodicRoll(); } if (LOG.isTraceEnabled()) { @@ -547,7 +536,6 @@ public class WALProcedureStore extends ProcedureStoreBase { waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); if (slotIndex == 0) { // no data.. probably a stop() or a periodic roll - periodicRoll(); continue; } } @@ -560,13 +548,12 @@ public class WALProcedureStore extends ProcedureStoreBase { long syncWaitMs = System.currentTimeMillis() - syncWaitSt; if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { float rollSec = getMillisFromLastRoll() / 1000.0f; - LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec", + LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", StringUtils.humanTimeDiff(syncWaitMs), slotIndex, StringUtils.humanSize(totalSynced.get()), StringUtils.humanSize(totalSynced.get() / rollSec))); } - inSync.set(true); totalSynced.addAndGet(syncSlots()); slotIndex = 0; @@ -639,8 +626,7 @@ public class WALProcedureStore extends ProcedureStoreBase { return totalSynced; } - @VisibleForTesting - public boolean rollWriterOrDie() { + private boolean rollWriterOrDie() { for (int i = 1; i <= rollRetries; ++i) { try { if (rollWriter()) { @@ -656,17 +642,13 @@ public class WALProcedureStore extends ProcedureStoreBase { throw new RuntimeException("unable to roll the log"); } - protected boolean checkAndTryRoll() { - if (!isRunning()) return false; - - if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { - try { - return rollWriter(); - } catch (IOException e) { - LOG.warn("Unable to roll the log", e); - } + private boolean tryRollWriter() { + try { + return rollWriter(); + } catch (IOException e) { + LOG.warn("Unable to roll the log", e); + return false; } - return false; } private long getMillisToNextPeriodicRoll() { @@ -680,7 +662,52 @@ public class WALProcedureStore extends ProcedureStoreBase { return (System.currentTimeMillis() - lastRollTs.get()); } - protected boolean rollWriter() throws IOException { + @VisibleForTesting + protected void periodicRollForTesting() throws IOException { + lock.lock(); + try { + periodicRoll(); + } finally { + lock.unlock(); + } + } + + @VisibleForTesting + protected boolean rollWriterForTesting() throws IOException { + lock.lock(); + try { + return rollWriter(); + } finally { + lock.unlock(); + } + } + + private void periodicRoll() throws IOException { + if (storeTracker.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("no active procedures"); + } + tryRollWriter(); + removeAllLogs(flushLogId - 1); + } else { + if (storeTracker.isUpdated()) { + if (LOG.isTraceEnabled()) { + LOG.trace("all the active procedures are in the latest log"); + } + removeAllLogs(flushLogId - 1); + } + + // if the log size has exceeded the roll threshold + // or the periodic roll timeout is expired, try to roll the wal. + if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { + tryRollWriter(); + } + + removeInactiveLogs(); + } + } + + private boolean rollWriter() throws IOException { // Create new state-log if (!rollWriter(flushLogId + 1)) { LOG.warn("someone else has already created log " + flushLogId); @@ -701,6 +728,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private boolean rollWriter(final long logId) throws IOException { assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; + assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); ProcedureWALHeader header = ProcedureWALHeader.newBuilder() .setVersion(ProcedureWALFormat.HEADER_VERSION) @@ -730,20 +758,16 @@ public class WALProcedureStore extends ProcedureStoreBase { newStream.close(); return false; } - lock.lock(); - try { - closeStream(); - synchronized (storeTracker) { - storeTracker.resetUpdates(); - } - stream = newStream; - flushLogId = logId; - totalSynced.set(0); - lastRollTs.set(System.currentTimeMillis()); - logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); - } finally { - lock.unlock(); - } + + closeStream(); + + storeTracker.resetUpdates(); + stream = newStream; + flushLogId = logId; + totalSynced.set(0); + lastRollTs.set(System.currentTimeMillis()); + logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); + if (LOG.isDebugEnabled()) { LOG.debug("Roll new state log: " + logId); } @@ -754,11 +778,9 @@ public class WALProcedureStore extends ProcedureStoreBase { try { if (stream != null) { try { - synchronized (storeTracker) { - ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); - ProcedureWALFormat.writeTrailer(stream, storeTracker); - } + ProcedureWALFile log = logs.getLast(); + log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + ProcedureWALFormat.writeTrailer(stream, storeTracker); } catch (IOException e) { LOG.warn("Unable to write the trailer: " + e.getMessage()); } @@ -774,30 +796,12 @@ public class WALProcedureStore extends ProcedureStoreBase { // ========================================================================== // Log Files cleaner helpers // ========================================================================== - private void setInactiveLogsMaxId(long logId) { - long expect = 0; - while (!inactiveLogsMaxId.compareAndSet(expect, logId)) { - expect = inactiveLogsMaxId.get(); - if (expect >= logId) { - break; - } - } - } - private void removeInactiveLogs() { - long lastLogId = inactiveLogsMaxId.get(); - if (lastLogId != 0) { - removeAllLogs(lastLogId); - inactiveLogsMaxId.compareAndSet(lastLogId, 0); - } - // Verify if the ProcId of the first oldest is still active. if not remove the file. while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); - synchronized (storeTracker) { - if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { - break; - } + if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { + break; } removeLogFile(log); } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 534e56e7406..cae10ef7c60 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -313,7 +313,7 @@ public class TestProcedureRecovery { public void testRunningProcWithSameNonce() throws Exception { final long nonceGroup = 456; final long nonce = 33333; - Procedure proc = new TestMultiStepProcedure(); + Procedure proc = new TestSingleStepProcedure(); long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); // Restart (use a latch to prevent the step execution until we submitted proc2) @@ -321,7 +321,7 @@ public class TestProcedureRecovery { procEnv.setWaitLatch(latch); restart(); // Submit a procedure with the same nonce and expect the same procedure would return. - Procedure proc2 = new TestMultiStepProcedure(); + Procedure proc2 = new TestSingleStepProcedure(); long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce); latch.countDown(); procEnv.setWaitLatch(null); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 7d6eb1ccab7..941803a1c68 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -42,27 +41,6 @@ import static org.junit.Assert.fail; public class TestProcedureStoreTracker { private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class); - static class TestProcedure extends Procedure { - public TestProcedure(long procId) { - setProcId(procId); - } - - @Override - protected Procedure[] execute(Void env) { return null; } - - @Override - protected void rollback(Void env) { /* no-op */ } - - @Override - protected boolean abort(Void env) { return false; } - - @Override - protected void serializeStateData(final OutputStream stream) { /* no-op */ } - - @Override - protected void deserializeStateData(final InputStream stream) { /* no-op */ } - } - @Test public void testSeqInsertAndDelete() { ProcedureStoreTracker tracker = new ProcedureStoreTracker(); @@ -162,13 +140,10 @@ public class TestProcedureStoreTracker { ProcedureStoreTracker tracker = new ProcedureStoreTracker(); assertTrue(tracker.isEmpty()); - Procedure[] procs = new TestProcedure[] { - new TestProcedure(1), new TestProcedure(2), new TestProcedure(3), - new TestProcedure(4), new TestProcedure(5), new TestProcedure(6), - }; + long[] procs = new long[] { 1, 2, 3, 4, 5, 6 }; - tracker.insert(procs[0], null); - tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] }); + tracker.insert(procs[0]); + tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] }); assertFalse(tracker.isEmpty()); assertTrue(tracker.isUpdated()); @@ -190,11 +165,11 @@ public class TestProcedureStoreTracker { assertTrue(tracker.isUpdated()); for (int i = 0; i < 5; ++i) { - tracker.delete(procs[i].getProcId()); + tracker.delete(procs[i]); assertFalse(tracker.isEmpty()); assertTrue(tracker.isUpdated()); } - tracker.delete(procs[5].getProcId()); + tracker.delete(procs[5]); assertTrue(tracker.isEmpty()); } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 35c8c786bf2..6dbc3aa0697 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -102,7 +102,7 @@ public class TestWALProcedureStore { @Test public void testEmptyRoll() throws Exception { for (int i = 0; i < 10; ++i) { - procStore.periodicRoll(); + procStore.periodicRollForTesting(); } FileStatus[] status = fs.listStatus(logDir); assertEquals(1, status.length); @@ -214,14 +214,14 @@ public class TestWALProcedureStore { procStore.update(rootProcs[i-1]); } // insert root-child txn - procStore.rollWriter(); + procStore.rollWriterForTesting(); for (int i = 1; i <= rootProcs.length; i++) { TestProcedure b = new TestProcedure(rootProcs.length + i, i); rootProcs[i-1].addStackId(1); procStore.insert(rootProcs[i-1], new Procedure[] { b }); } // insert child updates - procStore.rollWriter(); + procStore.rollWriterForTesting(); for (int i = 1; i <= rootProcs.length; i++) { procStore.update(new TestProcedure(rootProcs.length + i, i)); } @@ -229,9 +229,10 @@ public class TestWALProcedureStore { // Stop the store procStore.stop(false); - // Remove 4 byte from the trailer + // the first log was removed, + // we have insert-txn and updates in the others so everything is fine FileStatus[] logs = fs.listStatus(logDir); - assertEquals(3, logs.length); + assertEquals(Arrays.toString(logs), 2, logs.length); Arrays.sort(logs, new Comparator() { @Override public int compare(FileStatus o1, FileStatus o2) { @@ -239,15 +240,13 @@ public class TestWALProcedureStore { } }); - // Remove the first log, we have insert-txn and updates in the others so everything is fine. - fs.delete(logs[0].getPath(), false); LoadCounter loader = new LoadCounter(); storeRestart(loader); assertEquals(rootProcs.length * 2, loader.getLoadedCount()); assertEquals(0, loader.getCorruptedCount()); - // Remove the second log, we have lost any root/parent references - fs.delete(logs[1].getPath(), false); + // Remove the second log, we have lost all the root/parent references + fs.delete(logs[0].getPath(), false); loader.reset(); storeRestart(loader); assertEquals(0, loader.getLoadedCount()); @@ -276,7 +275,7 @@ public class TestWALProcedureStore { b.addStackId(1); procStore.update(b); - procStore.rollWriter(); + procStore.rollWriterForTesting(); a.addStackId(2); procStore.update(a); @@ -325,7 +324,7 @@ public class TestWALProcedureStore { b.addStackId(2); procStore.update(b); - procStore.rollWriter(); + procStore.rollWriterForTesting(); b.addStackId(3); procStore.update(b); @@ -426,6 +425,36 @@ public class TestWALProcedureStore { assertEquals(1, procStore.getActiveLogs().size()); } + @Test + public void testRollAndRemove() throws IOException { + // Insert something in the log + Procedure proc1 = new TestSequentialProcedure(); + procStore.insert(proc1, null); + + Procedure proc2 = new TestSequentialProcedure(); + procStore.insert(proc2, null); + + // roll the log, now we have 2 + procStore.rollWriterForTesting(); + assertEquals(2, procStore.getActiveLogs().size()); + + // everything will be up to date in the second log + // so we can remove the first one + procStore.update(proc1); + procStore.update(proc2); + assertEquals(1, procStore.getActiveLogs().size()); + + // roll the log, now we have 2 + procStore.rollWriterForTesting(); + assertEquals(2, procStore.getActiveLogs().size()); + + // remove everything active + // so we can remove all the logs + procStore.delete(proc1.getProcId()); + procStore.delete(proc2.getProcId()); + assertEquals(1, procStore.getActiveLogs().size()); + } + private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { assertTrue(logFile.getLen() > dropBytes);