diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 0ab04076e78..bc2998b9fd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -25,15 +25,19 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.wal.AbstractWALRoller; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -108,11 +112,20 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter } private OutputStream asyncOutputWrapper; + private long waitTimeout; public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, Class channelClass) { this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; + // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a wait on a future + // never completes. The objective is the same. We want to propagate an exception to trigger + // an abort if we seem to be hung. + if (this.conf == null) { + this.conf = HBaseConfiguration.create(); + } + this.waitTimeout = this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT, + AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT); } /* @@ -184,16 +197,16 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter this.asyncOutputWrapper = new OutputStreamWrapper(output); } - private long write(Consumer> action) throws IOException { + private long writeWALMetadata(Consumer> action) throws IOException { CompletableFuture future = new CompletableFuture<>(); action.accept(future); try { - return future.get().longValue(); + return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue(); } catch (InterruptedException e) { InterruptedIOException ioe = new InterruptedIOException(); ioe.initCause(e); throw ioe; - } catch (ExecutionException e) { + } catch (ExecutionException | TimeoutException e) { Throwables.propagateIfPossible(e.getCause(), IOException.class); throw new RuntimeException(e.getCause()); } @@ -201,7 +214,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { - return write(future -> { + return writeWALMetadata(future -> { output.write(magic); try { header.writeDelimitedTo(asyncOutputWrapper); @@ -221,7 +234,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { - return write(future -> { + return writeWALMetadata(future -> { try { trailer.writeTo(asyncOutputWrapper); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index fb9f3de91a3..b7a71c70196 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -63,7 +63,8 @@ public abstract class AbstractWALRoller extends Thread /** * Configure for the timeout of log rolling retry. */ - protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms"; + public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms"; + public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000; /** * Configure for the max count of log rolling retry. @@ -130,7 +131,7 @@ public abstract class AbstractWALRoller extends Thread this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.checkLowReplicationInterval = conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); - this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000); + this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT); // retry rolling does not have to be the default behavior, so the default value is 0 here this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0); }