HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length(addendum) (#2055)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2020-08-10 09:57:26 +08:00 committed by Duo Zhang
parent cee7431d0a
commit 457234c695
2 changed files with 29 additions and 5 deletions

View File

@ -94,9 +94,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
} }
} }
long pos = out.getPos(); long pos = out.getPos();
if(pos > this.syncedLength) { /**
this.syncedLength = pos; * This flush0 method could only be called by single thread, so here we could
} * safely overwrite without any synchronization.
*/
this.syncedLength = pos;
future.complete(pos); future.complete(pos);
} catch (IOException e) { } catch (IOException e) {
future.completeExceptionally(e); future.completeExceptionally(e);

View File

@ -59,7 +59,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private final Class<? extends Channel> channelClass; private final Class<? extends Channel> channelClass;
private AsyncFSOutput output; private volatile AsyncFSOutput output;
/**
* Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
*/
private volatile long finalSyncedLength = -1;
private static final class OutputStreamWrapper extends OutputStream private static final class OutputStreamWrapper extends OutputStream
implements ByteBufferWriter { implements ByteBufferWriter {
@ -156,6 +160,13 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
LOG.warn("normal close failed, try recover", e); LOG.warn("normal close failed, try recover", e);
output.recoverAndClose(null); output.recoverAndClose(null);
} }
/**
* We have to call {@link AsyncFSOutput#getSyncedLength()}
* after {@link AsyncFSOutput#close()} to get the final length
* synced to underlying filesystem because {@link AsyncFSOutput#close()}
* may also flush some data to underlying filesystem.
*/
this.finalSyncedLength = this.output.getSyncedLength();
this.output = null; this.output = null;
} }
@ -234,6 +245,17 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
public long getSyncedLength() { public long getSyncedLength() {
return this.output.getSyncedLength(); /**
* The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close}
* is a sync point, if output is null, then finalSyncedLength must set,
* so we can return finalSyncedLength, else we return output.getSyncedLength
*/
AsyncFSOutput outputToUse = this.output;
if(outputToUse == null) {
long finalSyncedLengthToUse = this.finalSyncedLength;
assert finalSyncedLengthToUse >= 0;
return finalSyncedLengthToUse;
}
return outputToUse.getSyncedLength();
} }
} }