HBASE-26715 Blocked on SyncFuture in AsyncProtobufLogWriter#write (#4184)
Co-authored-by: Bryan Beaudreault <bbeaudreault@gmail.com> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Viraj Jasani<virajjasani@apache.org> Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
fac011ed87
commit
49d3a00652
|
@ -25,15 +25,19 @@ import java.io.OutputStream;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferWriter;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
|
||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -108,11 +112,20 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
}
|
||||
|
||||
private OutputStream asyncOutputWrapper;
|
||||
private long waitTimeout;
|
||||
|
||||
public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
|
||||
Class<? extends Channel> channelClass) {
|
||||
this.eventLoopGroup = eventLoopGroup;
|
||||
this.channelClass = channelClass;
|
||||
// Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future
|
||||
// never completes. The objective is the same. We want to propagate an exception to trigger
|
||||
// an abort if we seem to be hung.
|
||||
if (this.conf == null) {
|
||||
this.conf = HBaseConfiguration.create();
|
||||
}
|
||||
this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
|
||||
AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -184,16 +197,16 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
this.asyncOutputWrapper = new OutputStreamWrapper(output);
|
||||
}
|
||||
|
||||
private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
|
||||
private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
|
||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||
action.accept(future);
|
||||
try {
|
||||
return future.get().longValue();
|
||||
return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
|
||||
} catch (InterruptedException e) {
|
||||
InterruptedIOException ioe = new InterruptedIOException();
|
||||
ioe.initCause(e);
|
||||
throw ioe;
|
||||
} catch (ExecutionException e) {
|
||||
} catch (ExecutionException | TimeoutException e) {
|
||||
Throwables.propagateIfPossible(e.getCause(), IOException.class);
|
||||
throw new RuntimeException(e.getCause());
|
||||
}
|
||||
|
@ -201,7 +214,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
|
||||
@Override
|
||||
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
|
||||
return write(future -> {
|
||||
return writeWALMetadata(future -> {
|
||||
output.write(magic);
|
||||
try {
|
||||
header.writeDelimitedTo(asyncOutputWrapper);
|
||||
|
@ -221,7 +234,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
|
||||
@Override
|
||||
protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
|
||||
return write(future -> {
|
||||
return writeWALMetadata(future -> {
|
||||
try {
|
||||
trailer.writeTo(asyncOutputWrapper);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -62,8 +62,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
/**
|
||||
* Configure for the timeout of log rolling retry.
|
||||
*/
|
||||
protected static final String WAL_ROLL_WAIT_TIMEOUT =
|
||||
"hbase.regionserver.logroll.wait.timeout.ms";
|
||||
public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms";
|
||||
public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000;
|
||||
|
||||
/**
|
||||
* Configure for the max count of log rolling retry.
|
||||
|
@ -130,7 +130,7 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
|||
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
this.checkLowReplicationInterval =
|
||||
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
||||
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000);
|
||||
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
|
||||
// retry rolling does not have to be the default behavior, so the default value is 0 here
|
||||
this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue