diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java index 39f1f71e247..c7cc1fcfcb4 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java @@ -94,9 +94,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput { } } long pos = out.getPos(); - if(pos > this.syncedLength) { - this.syncedLength = pos; - } + /** + * This flush0 method could only be called by single thread, so here we could + * safely overwrite without any synchronization. + */ + this.syncedLength = pos; future.complete(pos); } catch (IOException e) { future.completeExceptionally(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 8c944b1bdf5..e834d654310 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -59,7 +59,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private final Class channelClass; - private AsyncFSOutput output; + private volatile AsyncFSOutput output; + /** + * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed. + */ + private volatile long finalSyncedLength = -1; private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter { @@ -156,6 +160,13 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter LOG.warn("normal close failed, try recover", e); output.recoverAndClose(null); } + /** + * We have to call {@link AsyncFSOutput#getSyncedLength()} + * after {@link AsyncFSOutput#close()} to get the final length + * synced to underlying filesystem because {@link AsyncFSOutput#close()} + * may also flush some data to underlying filesystem. + */ + this.finalSyncedLength = this.output.getSyncedLength(); this.output = null; } @@ -234,6 +245,17 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override public long getSyncedLength() { - return this.output.getSyncedLength(); + /** + * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} + * is a sync point, if output is null, then finalSyncedLength must set, + * so we can return finalSyncedLength, else we return output.getSyncedLength + */ + AsyncFSOutput outputToUse = this.output; + if(outputToUse == null) { + long finalSyncedLengthToUse = this.finalSyncedLength; + assert finalSyncedLengthToUse >= 0; + return finalSyncedLengthToUse; + } + return outputToUse.getSyncedLength(); } }