HBASE-26715 Blocked on SyncFuture in AsyncProtobufLogWriter#write (#4184)
Co-authored-by: Bryan Beaudreault <bbeaudreault@gmail.com> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Viraj Jasani<virajjasani@apache.org> Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
1fca21722b
commit
bfae8539bf
|
@ -25,14 +25,18 @@ import java.io.OutputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.io.ByteBufferWriter;
|
import org.apache.hadoop.hbase.io.ByteBufferWriter;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
|
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
|
||||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -107,11 +111,20 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
private OutputStream asyncOutputWrapper;
|
private OutputStream asyncOutputWrapper;
|
||||||
|
private long waitTimeout;
|
||||||
|
|
||||||
public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
|
public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
|
||||||
Class<? extends Channel> channelClass) {
|
Class<? extends Channel> channelClass) {
|
||||||
this.eventLoopGroup = eventLoopGroup;
|
this.eventLoopGroup = eventLoopGroup;
|
||||||
this.channelClass = channelClass;
|
this.channelClass = channelClass;
|
||||||
|
// Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future
|
||||||
|
// never completes. The objective is the same. We want to propagate an exception to trigger
|
||||||
|
// an abort if we seem to be hung.
|
||||||
|
if (this.conf == null) {
|
||||||
|
this.conf = HBaseConfiguration.create();
|
||||||
|
}
|
||||||
|
this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
|
||||||
|
AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -182,16 +195,16 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
this.asyncOutputWrapper = new OutputStreamWrapper(output);
|
this.asyncOutputWrapper = new OutputStreamWrapper(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
|
private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
|
||||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||||
action.accept(future);
|
action.accept(future);
|
||||||
try {
|
try {
|
||||||
return future.get().longValue();
|
return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
InterruptedIOException ioe = new InterruptedIOException();
|
InterruptedIOException ioe = new InterruptedIOException();
|
||||||
ioe.initCause(e);
|
ioe.initCause(e);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException | TimeoutException e) {
|
||||||
Throwables.propagateIfPossible(e.getCause(), IOException.class);
|
Throwables.propagateIfPossible(e.getCause(), IOException.class);
|
||||||
throw new RuntimeException(e.getCause());
|
throw new RuntimeException(e.getCause());
|
||||||
}
|
}
|
||||||
|
@ -199,7 +212,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
|
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
|
||||||
return write(future -> {
|
return writeWALMetadata(future -> {
|
||||||
output.write(magic);
|
output.write(magic);
|
||||||
try {
|
try {
|
||||||
header.writeDelimitedTo(asyncOutputWrapper);
|
header.writeDelimitedTo(asyncOutputWrapper);
|
||||||
|
@ -219,7 +232,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
|
protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
|
||||||
return write(future -> {
|
return writeWALMetadata(future -> {
|
||||||
try {
|
try {
|
||||||
trailer.writeTo(asyncOutputWrapper);
|
trailer.writeTo(asyncOutputWrapper);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -62,8 +62,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
/**
|
/**
|
||||||
* Configure for the timeout of log rolling retry.
|
* Configure for the timeout of log rolling retry.
|
||||||
*/
|
*/
|
||||||
protected static final String WAL_ROLL_WAIT_TIMEOUT =
|
public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms";
|
||||||
"hbase.regionserver.logroll.wait.timeout.ms";
|
public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure for the max count of log rolling retry.
|
* Configure for the max count of log rolling retry.
|
||||||
|
@ -130,7 +130,7 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||||
this.checkLowReplicationInterval =
|
this.checkLowReplicationInterval =
|
||||||
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
||||||
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000);
|
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
|
||||||
// retry rolling does not have to be the default behavior, so the default value is 0 here
|
// retry rolling does not have to be the default behavior, so the default value is 0 here
|
||||||
this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
|
this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue