HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (#1970)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
e614b89c33
commit
a7a0e1a596
|
@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
|
|||
*/
|
||||
@Override
|
||||
void close() throws IOException;
|
||||
|
||||
/**
|
||||
* @return byteSize success synced to underlying filesystem.
|
||||
*/
|
||||
long getSyncedLength();
|
||||
}
|
||||
|
|
|
@ -575,4 +575,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
|||
public boolean isBroken() {
|
||||
return state == State.BROKEN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return this.ackedBlockLength;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
|
|||
|
||||
private final ExecutorService executor;
|
||||
|
||||
private volatile long syncedLength = 0;
|
||||
|
||||
public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
|
||||
this.out = out;
|
||||
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
|
||||
|
@ -91,7 +93,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
|
|||
out.hflush();
|
||||
}
|
||||
}
|
||||
future.complete(out.getPos());
|
||||
long pos = out.getPos();
|
||||
if(pos > this.syncedLength) {
|
||||
this.syncedLength = pos;
|
||||
}
|
||||
future.complete(pos);
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
return;
|
||||
|
@ -124,4 +130,9 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
|
|||
public boolean isBroken() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return this.syncedLength;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1109,7 +1109,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
Path currentPath = getOldPath();
|
||||
if (path.equals(currentPath)) {
|
||||
W writer = this.writer;
|
||||
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
|
||||
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
|
||||
} else {
|
||||
return OptionalLong.empty();
|
||||
}
|
||||
|
|
|
@ -231,4 +231,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
protected OutputStream getOutputStreamForCellEncoder() {
|
||||
return asyncOutputWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return this.output.getSyncedLength();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,11 @@ public final class CombinedAsyncWriter implements AsyncWriter {
|
|||
return writers.get(0).getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return writers.get(0).getSyncedLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
Exception error = null;
|
||||
|
|
|
@ -19,11 +19,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.util.AtomicUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
|
@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
|
||||
protected FSDataOutputStream output;
|
||||
|
||||
private final AtomicLong syncedLength = new AtomicLong(0);
|
||||
|
||||
@Override
|
||||
public void append(Entry entry) throws IOException {
|
||||
entry.getKey().getBuilder(compressor).
|
||||
|
@ -85,6 +90,12 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
} else {
|
||||
fsdos.hflush();
|
||||
}
|
||||
AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return this.syncedLength.get();
|
||||
}
|
||||
|
||||
public FSDataOutputStream getStream() {
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.OptionalLong;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -74,6 +75,22 @@ public interface WALProvider {
|
|||
|
||||
interface WriterBase extends Closeable {
|
||||
long getLength();
|
||||
/**
|
||||
* NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
|
||||
* considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
|
||||
* according to the visibility guarantee of HDFS, the data will be available immediately
|
||||
* when arriving at DN since all the DNs will be considered as the last one in pipeline.
|
||||
* This means replication may read uncommitted data and replicate it to the remote cluster
|
||||
* and cause data inconsistency.
|
||||
* The method {@link WriterBase#getLength} may return length which just in hdfs client
|
||||
* buffer and not successfully synced to HDFS, so we use this method to return the length
|
||||
* successfully synced to HDFS and replication thread could only read writing WAL file
|
||||
* limited by this length.
|
||||
* see also HBASE-14004 and this document for more details:
|
||||
* https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
|
||||
* @return byteSize successfully synced to underlying filesystem.
|
||||
*/
|
||||
long getSyncedLength();
|
||||
}
|
||||
|
||||
// Writers are used internally. Users outside of the WAL should be relying on the
|
||||
|
|
|
@ -151,6 +151,11 @@ public class TestFailedAppendAndSync {
|
|||
public long getLength() {
|
||||
return w.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return w.getSyncedLength();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1246,6 +1246,11 @@ public class TestHRegion {
|
|||
public long getLength() {
|
||||
return w.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return w.getSyncedLength();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,6 +190,11 @@ public class TestWALLockup {
|
|||
public long getLength() {
|
||||
return w.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return w.getSyncedLength();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -374,6 +379,11 @@ public class TestWALLockup {
|
|||
public long getLength() {
|
||||
return w.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return w.getSyncedLength();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,6 +155,11 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
|||
return writer.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return writer.getSyncedLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||
CompletableFuture<Long> result = writer.sync(forceSync);
|
||||
|
|
|
@ -109,6 +109,11 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
|
|||
return writer.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return writer.getSyncedLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||
writerSyncFlag = forceSync;
|
||||
|
|
|
@ -84,6 +84,11 @@ class CustomFSHLog extends FSHLog {
|
|||
return writer.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return writer.getSyncedLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
writerSyncFlag = forceSync;
|
||||
|
|
|
@ -174,6 +174,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
public long getLength() {
|
||||
return oldWriter1.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return oldWriter1.getSyncedLength();
|
||||
}
|
||||
};
|
||||
log.setWriter(newWriter1);
|
||||
|
||||
|
@ -231,6 +236,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
|
|||
public long getLength() {
|
||||
return oldWriter2.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return oldWriter2.getSyncedLength();
|
||||
}
|
||||
};
|
||||
log.setWriter(newWriter2);
|
||||
|
||||
|
|
|
@ -44,6 +44,11 @@ class WriterOverAsyncWriter implements WALProvider.Writer {
|
|||
return asyncWriter.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return asyncWriter.getSyncedLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(Entry entry) throws IOException {
|
||||
asyncWriter.append(entry);
|
||||
|
|
|
@ -59,6 +59,11 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL {
|
|||
return localWriter.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSyncedLength() {
|
||||
return this.localWriter.getSyncedLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
Closeables.close(localWriter, true);
|
||||
|
|
Loading…
Reference in New Issue