HBASE-17206 FSHLog may roll a new writer successfully with unflushed entries

This commit is contained in:
zhangduo 2016-11-30 21:41:29 +08:00
parent b3627ef51e
commit c8ea82299c
1 changed files with 11 additions and 9 deletions

View File

@ -1654,6 +1654,12 @@ public class FSHLog implements WAL {
*/ */
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException {
if (syncFuture.isThrowable()) {
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
}
}
/** /**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained. * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
* Thread A will be held in here until Thread B calls {@link #safePointAttained()} * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
@ -1664,16 +1670,12 @@ public class FSHLog implements WAL {
* @return The passed <code>syncFuture</code> * @return The passed <code>syncFuture</code>
* @throws FailedSyncBeforeLogCloseException * @throws FailedSyncBeforeLogCloseException
*/ */
SyncFuture waitSafePoint(final SyncFuture syncFuture) SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
throws InterruptedException, FailedSyncBeforeLogCloseException { FailedSyncBeforeLogCloseException {
while (true) { while (!this.safePointAttainedLatch.await(1, TimeUnit.MILLISECONDS)) {
if (this.safePointAttainedLatch.await(1, TimeUnit.MILLISECONDS)) { checkIfSyncFailed(syncFuture);
break;
}
if (syncFuture.isThrowable()) {
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
}
} }
checkIfSyncFailed(syncFuture);
return syncFuture; return syncFuture;
} }