HBASE-17206 FSHLog may roll a new writer successfully with unflushed entries

This commit is contained in:
zhangduo 2016-11-30 21:41:29 +08:00
parent 37d7b57128
commit fb789b340c
1 changed files with 10 additions and 8 deletions

View File

@ -849,6 +849,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
*/ */
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException {
if (syncFuture.isThrowable()) {
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
}
}
/** /**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained. Thread A * For Thread A to call when it is ready to wait on the 'safe point' to be attained. Thread A
* will be held in here until Thread B calls {@link #safePointAttained()} * will be held in here until Thread B calls {@link #safePointAttained()}
@ -856,16 +862,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
* exception, then something is up w/ our syncing. * exception, then something is up w/ our syncing.
* @return The passed <code>syncFuture</code> * @return The passed <code>syncFuture</code>
*/ */
SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException, SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
FailedSyncBeforeLogCloseException { FailedSyncBeforeLogCloseException {
while (true) { while (!this.safePointAttainedLatch.await(1, TimeUnit.MILLISECONDS)) {
if (this.safePointAttainedLatch.await(1, TimeUnit.MILLISECONDS)) { checkIfSyncFailed(syncFuture);
break;
}
if (syncFuture.isThrowable()) {
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
}
} }
checkIfSyncFailed(syncFuture);
return syncFuture; return syncFuture;
} }