HBASE-16781 Fix flaky TestMasterProcedureWalLease

This commit is contained in:
Matteo Bertozzi 2016-10-07 17:32:19 -07:00
parent c7cae6be3d
commit 29d701a314
5 changed files with 43 additions and 16 deletions

View File

@ -122,6 +122,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final AtomicBoolean inSync = new AtomicBoolean(false); private final AtomicBoolean inSync = new AtomicBoolean(false);
private final AtomicLong totalSynced = new AtomicLong(0); private final AtomicLong totalSynced = new AtomicLong(0);
private final AtomicLong lastRollTs = new AtomicLong(0); private final AtomicLong lastRollTs = new AtomicLong(0);
private final AtomicLong syncId = new AtomicLong(0);
private LinkedTransferQueue<ByteSlot> slotsCache = null; private LinkedTransferQueue<ByteSlot> slotsCache = null;
private Set<ProcedureWALFile> corruptedLogs = null; private Set<ProcedureWALFile> corruptedLogs = null;
@ -226,15 +227,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
@Override @Override
public void stop(boolean abort) { public void stop(final boolean abort) {
if (!setRunning(false)) { if (!setRunning(false)) {
return; return;
} }
LOG.info("Stopping the WAL Procedure Store"); LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort +
(isSyncAborted() ? " (self aborting)" : ""));
sendStopSignal(); sendStopSignal();
if (!isSyncAborted()) {
if (!abort) {
try { try {
while (syncThread.isAlive()) { while (syncThread.isAlive()) {
sendStopSignal(); sendStopSignal();
@ -525,6 +526,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
} }
final long pushSyncId = syncId.get();
updateStoreTracker(type, procId, subProcIds); updateStoreTracker(type, procId, subProcIds);
slots[slotIndex++] = slot; slots[slotIndex++] = slot;
logId = flushLogId; logId = flushLogId;
@ -540,7 +542,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
slotCond.signal(); slotCond.signal();
} }
syncCond.await(); while (pushSyncId == syncId.get() && isRunning()) {
syncCond.await();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
sendAbortProcessSignal(); sendAbortProcessSignal();
@ -642,13 +646,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
totalSyncedToStore = totalSynced.addAndGet(slotSize); totalSyncedToStore = totalSynced.addAndGet(slotSize);
slotIndex = 0; slotIndex = 0;
inSync.set(false); inSync.set(false);
syncId.incrementAndGet();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
sendAbortProcessSignal();
syncException.compareAndSet(null, e); syncException.compareAndSet(null, e);
sendAbortProcessSignal();
throw e; throw e;
} catch (Throwable t) { } catch (Throwable t) {
syncException.compareAndSet(null, t); syncException.compareAndSet(null, t);
sendAbortProcessSignal();
throw t; throw t;
} finally { } finally {
syncCond.signalAll(); syncCond.signalAll();
@ -679,13 +685,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("unable to sync slots, retry=" + retry); LOG.warn("unable to sync slots, retry=" + retry);
if (++retry >= maxRetriesBeforeRoll) { if (++retry >= maxRetriesBeforeRoll) {
if (logRolled >= maxSyncFailureRoll) { if (logRolled >= maxSyncFailureRoll && isRunning()) {
LOG.error("Sync slots after log roll failed, abort.", e); LOG.error("Sync slots after log roll failed, abort.", e);
sendAbortProcessSignal();
throw e; throw e;
} }
if (!rollWriterOrDie()) { if (!rollWriterWithRetries()) {
throw e; throw e;
} }
@ -720,8 +725,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
return totalSynced; return totalSynced;
} }
private boolean rollWriterOrDie() { private boolean rollWriterWithRetries() {
for (int i = 0; i < rollRetries; ++i) { for (int i = 0; i < rollRetries && isRunning(); ++i) {
if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
try { try {
@ -733,8 +738,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
} }
LOG.fatal("Unable to roll the log"); LOG.fatal("Unable to roll the log");
sendAbortProcessSignal(); return false;
throw new RuntimeException("unable to roll the log");
} }
private boolean tryRollWriter() { private boolean tryRollWriter() {
@ -777,7 +781,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
} }
@VisibleForTesting void removeInactiveLogsForTesting() throws Exception { @VisibleForTesting
protected void removeInactiveLogsForTesting() throws Exception {
lock.lock(); lock.lock();
try { try {
removeInactiveLogs(); removeInactiveLogs();
@ -812,6 +817,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
private boolean rollWriter() throws IOException { private boolean rollWriter() throws IOException {
if (!isRunning()) return false;
// Create new state-log // Create new state-log
if (!rollWriter(flushLogId + 1)) { if (!rollWriter(flushLogId + 1)) {
LOG.warn("someone else has already created log " + flushLogId); LOG.warn("someone else has already created log " + flushLogId);
@ -1043,6 +1050,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
for (int i = 0; i < logFiles.length; ++i) { for (int i = 0; i < logFiles.length; ++i) {
final Path logPath = logFiles[i].getPath(); final Path logPath = logFiles[i].getPath();
leaseRecovery.recoverFileLease(fs, logPath); leaseRecovery.recoverFileLease(fs, logPath);
if (!isRunning()) {
throw new IOException("wal aborting");
}
maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
ProcedureWALFile log = initOldLog(logFiles[i]); ProcedureWALFile log = initOldLog(logFiles[i]);
if (log != null) { if (log != null) {
@ -1061,7 +1072,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
* it using entries in the log. * it using entries in the log.
*/ */
private void initTrackerFromOldLogs() { private void initTrackerFromOldLogs() {
if (logs.isEmpty()) return; if (logs.isEmpty() || !isRunning()) return;
ProcedureWALFile log = logs.getLast(); ProcedureWALFile log = logs.getLast();
if (!log.getTracker().isPartial()) { if (!log.getTracker().isPartial()) {
storeTracker.resetTo(log.getTracker()); storeTracker.resetTo(log.getTracker());

View File

@ -374,4 +374,9 @@ public interface MasterServices extends Server {
* @return load balancer * @return load balancer
*/ */
public LoadBalancer getLoadBalancer(); public LoadBalancer getLoadBalancer();
/**
* @return True if this master is stopping.
*/
boolean isStopping();
} }

View File

@ -61,10 +61,15 @@ public class MasterProcedureEnv {
@Override @Override
public boolean progress() { public boolean progress() {
LOG.debug("Recover Procedure Store log lease: " + path); LOG.debug("Recover Procedure Store log lease: " + path);
return master.isActiveMaster(); return isRunning();
} }
}); });
} }
private boolean isRunning() {
return master.isActiveMaster() && !master.isStopped() &&
!master.isStopping() && !master.isAborted();
}
} }
@InterfaceAudience.Private @InterfaceAudience.Private

View File

@ -179,6 +179,11 @@ public class MockNoopMasterServices implements MasterServices, Server {
stopped = true; stopped = true;
} }
@Override
public boolean isStopping() {
return stopped;
}
@Override @Override
public boolean isStopped() { public boolean isStopped() {
return stopped; return stopped;

View File

@ -398,6 +398,7 @@ public class MasterProcedureTestingUtility {
// restart executor/store // restart executor/store
// rollback step N - save on store // rollback step N - save on store
InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
abortListener.addProcId(procId);
procExec.registerListener(abortListener); procExec.registerListener(abortListener);
try { try {
for (int i = 0; !procExec.isFinished(procId); ++i) { for (int i = 0; !procExec.isFinished(procId); ++i) {