HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length.(#2034)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
chenglei 2020-07-11 16:35:10 +08:00 committed by GitHub
parent ff2951e672
commit deff4c88d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 136 additions and 39 deletions

View File

@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
*/
@Override
void close() throws IOException;
/**
* @return byteSize success synced to underlying filesystem.
*/
long getSyncedLength();
}

View File

@ -574,4 +574,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
public boolean isBroken() {
return state == State.BROKEN;
}
@Override
public long getSyncedLength() {
return this.ackedBlockLength;
}
}

View File

@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
private final ExecutorService executor;
private volatile long syncedLength = 0;
public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
@ -91,7 +93,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
out.hflush();
}
}
future.complete(out.getPos());
long pos = out.getPos();
if(pos > this.syncedLength) {
this.syncedLength = pos;
}
future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
return;
@ -124,4 +130,9 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
public boolean isBroken() {
return false;
}
@Override
public long getSyncedLength() {
return this.syncedLength;
}
}

View File

@ -1061,7 +1061,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
return OptionalLong.empty();
}

View File

@ -671,13 +671,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
private long closeWriter() {
AsyncWriter oldWriter = this.writer;
if (oldWriter != null) {
long fileLength = oldWriter.getLength();
protected final long closeWriter(AsyncWriter writer) {
if (writer != null) {
long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
oldWriter.close();
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
@ -693,7 +692,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter();
long oldFileLen = closeWriter(this.writer);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
@ -719,7 +718,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
closeWriter();
closeWriter(this.writer);
this.writer = null;
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {

View File

@ -231,4 +231,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
protected OutputStream getOutputStreamForCellEncoder() {
return asyncOutputWrapper;
}
@Override
public long getSyncedLength() {
return this.output.getSyncedLength();
}
}

View File

@ -19,11 +19,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
protected FSDataOutputStream output;
private final AtomicLong syncedLength = new AtomicLong(0);
@Override
public void append(Entry entry) throws IOException {
entry.getKey().getBuilder(compressor).
@ -85,6 +90,12 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
} else {
fsdos.hflush();
}
AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
}
@Override
public long getSyncedLength() {
return this.syncedLength.get();
}
public FSDataOutputStream getStream() {

View File

@ -25,6 +25,7 @@ import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
@ -74,6 +75,22 @@ public interface WALProvider {
interface WriterBase extends Closeable {
long getLength();
/**
* NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
* considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
* according to the visibility guarantee of HDFS, the data will be available immediately
* when arriving at DN since all the DNs will be considered as the last one in pipeline.
* This means replication may read uncommitted data and replicate it to the remote cluster
* and cause data inconsistency.
* The method {@link WriterBase#getLength} may return length which just in hdfs client
* buffer and not successfully synced to HDFS, so we use this method to return the length
* successfully synced to HDFS and replication thread could only read writing WAL file
* limited by this length.
* see also HBASE-14004 and this document for more details:
* https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
* @return byteSize successfully synced to underlying filesystem.
*/
long getSyncedLength();
}
// Writers are used internally. Users outside of the WAL should be relying on the

View File

@ -130,35 +130,40 @@ public class TestFailedAppendAndSync {
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync(boolean forceSync) throws IOException {
if (throwSyncException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.sync(forceSync);
}
@Override
public void append(Entry entry) throws IOException {
if (throwAppendException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.append(entry);
}
@Override
public long getLength() {
return w.getLength();
}
};
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync(boolean forceSync) throws IOException {
if (throwSyncException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.sync(forceSync);
}
@Override
public void append(Entry entry) throws IOException {
if (throwAppendException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.append(entry);
}
@Override
public long getLength() {
return w.getLength();
}
@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
// Make up mocked server and services.
RegionServerServices services = mock(RegionServerServices.class);

View File

@ -1251,6 +1251,11 @@ public class TestHRegion {
public long getLength() {
return w.getLength();
}
@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}

View File

@ -190,6 +190,11 @@ public class TestWALLockup {
public long getLength() {
return w.getLength();
}
@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
@ -374,6 +379,11 @@ public class TestWALLockup {
public long getLength() {
return w.getLength();
}
@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}

View File

@ -155,6 +155,11 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
return writer.getLength();
}
@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}
@Override
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);

View File

@ -109,6 +109,11 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
return writer.getLength();
}
@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}
@Override
public CompletableFuture<Long> sync(boolean forceSync) {
writerSyncFlag = forceSync;

View File

@ -84,6 +84,11 @@ class CustomFSHLog extends FSHLog {
return writer.getLength();
}
@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}
@Override
public void sync(boolean forceSync) throws IOException {
writerSyncFlag = forceSync;

View File

@ -174,6 +174,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
public long getLength() {
return oldWriter1.getLength();
}
@Override
public long getSyncedLength() {
return oldWriter1.getSyncedLength();
}
};
log.setWriter(newWriter1);
@ -231,6 +236,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
public long getLength() {
return oldWriter2.getLength();
}
@Override
public long getSyncedLength() {
return oldWriter2.getSyncedLength();
}
};
log.setWriter(newWriter2);

View File

@ -448,9 +448,7 @@ public class TestWALEntryStream {
batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(5, batch.getNbEntries());
// Actually this should be true but we haven't handled this yet since for a normal queue the
// last one is always open... Not a big deal for now.
assertFalse(batch.isEndOfFile());
assertTrue(batch.isEndOfFile());
assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
}