HBASE-14843 TestWALProcedureStore.testLoad is flakey

This commit is contained in:
Matteo Bertozzi 2015-11-23 09:54:42 -08:00
parent 55087ce888
commit 0f3e2e0bfa
2 changed files with 12 additions and 1 deletions

View File

@ -111,6 +111,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final FileSystem fs; private final FileSystem fs;
private final Path logDir; private final Path logDir;
private AtomicBoolean loading = new AtomicBoolean(true);
private AtomicBoolean inSync = new AtomicBoolean(false); private AtomicBoolean inSync = new AtomicBoolean(false);
private AtomicReference<Throwable> syncException = new AtomicReference<>(); private AtomicReference<Throwable> syncException = new AtomicReference<>();
private LinkedTransferQueue<ByteSlot> slotsCache = null; private LinkedTransferQueue<ByteSlot> slotsCache = null;
@ -276,6 +277,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
LOG.debug("No state logs to replay."); LOG.debug("No state logs to replay.");
} }
loader.setMaxProcId(0); loader.setMaxProcId(0);
loading.set(false);
return; return;
} }
@ -320,6 +322,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
removeLogFile(log); removeLogFile(log);
} }
} }
loading.set(false);
} }
} }
@ -528,7 +531,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
try { try {
// Wait until new data is available // Wait until new data is available
if (slotIndex == 0) { if (slotIndex == 0) {
if (!loading.get()) {
removeInactiveLogs(); removeInactiveLogs();
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
float rollTsSec = getMillisFromLastRoll() / 1000.0f; float rollTsSec = getMillisFromLastRoll() / 1000.0f;

View File

@ -383,6 +383,12 @@ public class TestWALProcedureStore {
procStore.start(NTHREAD); procStore.start(NTHREAD);
procStore.recoverLease(); procStore.recoverLease();
LoadCounter loader = new LoadCounter();
procStore.load(loader);
assertEquals(0, loader.getMaxProcId());
assertEquals(0, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
final long LAST_PROC_ID = 9999; final long LAST_PROC_ID = 9999;
final Thread[] thread = new Thread[NTHREAD]; final Thread[] thread = new Thread[NTHREAD];
final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100)); final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));