HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length(addendum) (#2055)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
206d0a02f2
commit
7b099eaa75
|
@ -94,9 +94,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
long pos = out.getPos();
|
long pos = out.getPos();
|
||||||
if(pos > this.syncedLength) {
|
/**
|
||||||
this.syncedLength = pos;
|
* This flush0 method could only be called by single thread, so here we could
|
||||||
}
|
* safely overwrite without any synchronization.
|
||||||
|
*/
|
||||||
|
this.syncedLength = pos;
|
||||||
future.complete(pos);
|
future.complete(pos);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
|
|
|
@ -59,7 +59,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
|
|
||||||
private final Class<? extends Channel> channelClass;
|
private final Class<? extends Channel> channelClass;
|
||||||
|
|
||||||
private AsyncFSOutput output;
|
private volatile AsyncFSOutput output;
|
||||||
|
/**
|
||||||
|
* Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
|
||||||
|
*/
|
||||||
|
private volatile long finalSyncedLength = -1;
|
||||||
|
|
||||||
private static final class OutputStreamWrapper extends OutputStream
|
private static final class OutputStreamWrapper extends OutputStream
|
||||||
implements ByteBufferWriter {
|
implements ByteBufferWriter {
|
||||||
|
@ -156,6 +160,13 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
LOG.warn("normal close failed, try recover", e);
|
LOG.warn("normal close failed, try recover", e);
|
||||||
output.recoverAndClose(null);
|
output.recoverAndClose(null);
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* We have to call {@link AsyncFSOutput#getSyncedLength()}
|
||||||
|
* after {@link AsyncFSOutput#close()} to get the final length
|
||||||
|
* synced to underlying filesystem because {@link AsyncFSOutput#close()}
|
||||||
|
* may also flush some data to underlying filesystem.
|
||||||
|
*/
|
||||||
|
this.finalSyncedLength = this.output.getSyncedLength();
|
||||||
this.output = null;
|
this.output = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,6 +245,17 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getSyncedLength() {
|
public long getSyncedLength() {
|
||||||
return this.output.getSyncedLength();
|
/**
|
||||||
|
* The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close}
|
||||||
|
* is a sync point, if output is null, then finalSyncedLength must set,
|
||||||
|
* so we can return finalSyncedLength, else we return output.getSyncedLength
|
||||||
|
*/
|
||||||
|
AsyncFSOutput outputToUse = this.output;
|
||||||
|
if(outputToUse == null) {
|
||||||
|
long finalSyncedLengthToUse = this.finalSyncedLength;
|
||||||
|
assert finalSyncedLengthToUse >= 0;
|
||||||
|
return finalSyncedLengthToUse;
|
||||||
|
}
|
||||||
|
return outputToUse.getSyncedLength();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue