HBASE-26715 Blocked on SyncFuture in AsyncProtobufLogWriter#write (#4184)

Co-authored-by: Bryan Beaudreault <bbeaudreault@gmail.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani<virajjasani@apache.org>
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Andrew Purtell 2022-03-10 11:55:38 -08:00 committed by GitHub
parent 4da53efcf9
commit e5dbbd20ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 7 deletions

View File

@ -25,15 +25,19 @@ import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -108,11 +112,20 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
} }
private OutputStream asyncOutputWrapper; private OutputStream asyncOutputWrapper;
private long waitTimeout;
public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) { Class<? extends Channel> channelClass) {
this.eventLoopGroup = eventLoopGroup; this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass; this.channelClass = channelClass;
// Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future
// never completes. The objective is the same. We want to propagate an exception to trigger
// an abort if we seem to be hung.
if (this.conf == null) {
this.conf = HBaseConfiguration.create();
}
this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
} }
/* /*
@ -184,16 +197,16 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
this.asyncOutputWrapper = new OutputStreamWrapper(output); this.asyncOutputWrapper = new OutputStreamWrapper(output);
} }
private long write(Consumer<CompletableFuture<Long>> action) throws IOException { private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>(); CompletableFuture<Long> future = new CompletableFuture<>();
action.accept(future); action.accept(future);
try { try {
return future.get().longValue(); return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
} catch (InterruptedException e) { } catch (InterruptedException e) {
InterruptedIOException ioe = new InterruptedIOException(); InterruptedIOException ioe = new InterruptedIOException();
ioe.initCause(e); ioe.initCause(e);
throw ioe; throw ioe;
} catch (ExecutionException e) { } catch (ExecutionException | TimeoutException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class); Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e.getCause()); throw new RuntimeException(e.getCause());
} }
@ -201,7 +214,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
return write(future -> { return writeWALMetadata(future -> {
output.write(magic); output.write(magic);
try { try {
header.writeDelimitedTo(asyncOutputWrapper); header.writeDelimitedTo(asyncOutputWrapper);
@ -221,7 +234,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
return write(future -> { return writeWALMetadata(future -> {
try { try {
trailer.writeTo(asyncOutputWrapper); trailer.writeTo(asyncOutputWrapper);
} catch (IOException e) { } catch (IOException e) {

View File

@ -63,7 +63,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
/** /**
* Configure for the timeout of log rolling retry. * Configure for the timeout of log rolling retry.
*/ */
protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms"; public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms";
public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000;
/** /**
* Configure for the max count of log rolling retry. * Configure for the max count of log rolling retry.
@ -130,7 +131,7 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval = this.checkLowReplicationInterval =
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000); this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
// retry rolling does not have to be the default behavior, so the default value is 0 here // retry rolling does not have to be the default behavior, so the default value is 0 here
this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0); this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
} }