diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 4fa06a4de4d..d7aa897d56b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -21,23 +21,23 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; -import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; -import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; +import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; +import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import com.google.protobuf.CodedOutputStream; - import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.concurrent.TimeUnit; - +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.Encryptor; @@ -85,6 +85,7 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; @@ -121,6 +122,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private FanOutOneBlockAsyncDFSOutputHelper() { } + public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; + + public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; // use pooled allocator for performance. private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; @@ -129,8 +133,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // Timeouts for communicating with DataNode for streaming writes/reads public static final int READ_TIMEOUT = 60 * 1000; - public static final int READ_TIMEOUT_EXTENSION = 5 * 1000; - public static final int WRITE_TIMEOUT = 8 * 60 * 1000; + + private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0]; // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may @@ -744,58 +748,90 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { DFSClient client = dfs.getClient(); String clientName = client.getClientName(); ClientProtocol namenode = client.getNamenode(); - HdfsFileStatus stat; - try { - stat = FILE_CREATOR.create(namenode, src, - FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), - createParent, replication, blockSize, CryptoProtocolVersion.supported()); - } catch (Exception e) { - if (e instanceof RemoteException) { - throw (RemoteException) e; - } else { - throw new NameNodeException(e); + int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, + DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); + DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; + for (int retry = 0;; retry++) { + HdfsFileStatus stat; + try { + stat = FILE_CREATOR.create(namenode, src, + FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, + new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), + createParent, replication, blockSize, CryptoProtocolVersion.supported()); + } catch (Exception e) { + if (e instanceof RemoteException) { + throw (RemoteException) e; + } else { + throw new NameNodeException(e); + } } - } - beginFileLease(client, stat.getFileId()); - boolean succ = false; - LocatedBlock locatedBlock = null; - List> futureList = null; - try { - DataChecksum summer = createChecksum(client); - locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null, - stat.getFileId(), null); - List datanodeList = new ArrayList<>(); - futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, - PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); - for (Future future : futureList) { - // fail the creation if there are connection failures since we are fail-fast. The upper - // layer should retry itself if needed. - datanodeList.add(future.syncUninterruptibly().getNow()); - } - Encryptor encryptor = createEncryptor(conf, stat, client); - FanOutOneBlockAsyncDFSOutput output = - new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, - stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); - succ = true; - return output; - } finally { - if (!succ) { - if (futureList != null) { - for (Future f : futureList) { - f.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - future.getNow().close(); - } - } - }); + beginFileLease(client, stat.getFileId()); + boolean succ = false; + LocatedBlock locatedBlock = null; + List> futureList = null; + try { + DataChecksum summer = createChecksum(client); + locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, + excludesNodes, stat.getFileId(), null); + List datanodeList = new ArrayList<>(); + futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, + PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); + for (int i = 0, n = futureList.size(); i < n; i++) { + try { + datanodeList.add(futureList.get(i).syncUninterruptibly().getNow()); + } catch (Exception e) { + // exclude the broken DN next time + excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]); + throw e; } } - endFileLease(client, stat.getFileId()); - fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); + Encryptor encryptor = createEncryptor(conf, stat, client); + FanOutOneBlockAsyncDFSOutput output = + new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, + stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); + succ = true; + return output; + } catch (RemoteException e) { + LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); + if (shouldRetryCreate(e)) { + if (retry >= createMaxRetries) { + throw e.unwrapRemoteException(); + } + } else { + throw e.unwrapRemoteException(); + } + } catch (NameNodeException e) { + throw e; + } catch (IOException e) { + LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); + if (retry >= createMaxRetries) { + throw e; + } + // overwrite the old broken file. + overwrite = true; + try { + Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); + } catch (InterruptedException ie) { + throw new InterruptedIOException(); + } + } finally { + if (!succ) { + if (futureList != null) { + for (Future f : futureList) { + f.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + future.getNow().close(); + } + } + }); + } + } + endFileLease(client, stat.getFileId()); + fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); + } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index be8665b55df..0ace78280e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -17,14 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; - import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequencer; - import java.io.IOException; -import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.util.ArrayDeque; import java.util.Comparator; @@ -44,26 +40,23 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; -import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.ipc.RemoteException; import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; @@ -140,9 +133,6 @@ public class AsyncFSWAL extends AbstractFSWAL { public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; - public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries"; - public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; - public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = "hbase.wal.async.use-shared-event-loop"; public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; @@ -189,8 +179,6 @@ public class AsyncFSWAL extends AbstractFSWAL { private final long batchSize; - private final int createMaxRetries; - private final ExecutorService closeExecutor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); @@ -257,8 +245,6 @@ public class AsyncFSWAL extends AbstractFSWAL { waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); - createMaxRetries = - conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); rollWriter(); @@ -622,46 +608,19 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { - boolean overwrite = false; - for (int retry = 0;; retry++) { - try { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoopGroup, - channelClass); - } catch (RemoteException e) { - LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); - if (shouldRetryCreate(e)) { - if (retry >= createMaxRetries) { - break; - } - } else { - IOException ioe = e.unwrapRemoteException(); - // this usually means master already think we are dead so let's fail all the pending - // syncs. The shutdown process of RS will wait for all regions to be closed before calling - // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead - // lock. - if (e.getMessage().contains("Parent directory doesn't exist:")) { - syncFutures.forEach(f -> f.done(f.getTxid(), ioe)); - } - throw ioe; - } - } catch (NameNodeException e) { - throw e; - } catch (IOException e) { - LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); - if (retry >= createMaxRetries) { - break; - } - // overwrite the old broken file. - overwrite = true; - try { - Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); - } catch (InterruptedException ie) { - throw new InterruptedIOException(); - } + try { + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup, + channelClass); + } catch (IOException e) { + // this usually means master already think we are dead so let's fail all the pending + // syncs. The shutdown process of RS will wait for all regions to be closed before calling + // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead + // lock. + if (e.getMessage().contains("Parent directory doesn't exist:")) { + syncFutures.forEach(f -> f.done(f.getTxid(), e)); } + throw e; } - throw new IOException("Failed to create wal log writer " + path + " after retrying " + - createMaxRetries + " time(s)"); } private void waitForSafePoint() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 3a8da21cadd..42539c22666 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -35,7 +35,6 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,9 +42,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.Daemon; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -54,6 +53,7 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -95,23 +95,6 @@ public class TestFanOutOneBlockAsyncDFSOutput { TEST_UTIL.shutdownMiniDFSCluster(); } - private void ensureAllDatanodeAlive() throws InterruptedException { - // FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we - // can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests - // will fail. - for (;;) { - try { - FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"), - true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS); - out.close(); - break; - } catch (IOException e) { - Thread.sleep(100); - } - } - } - static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) throws IOException, InterruptedException, ExecutionException { List> futures = new ArrayList<>(); @@ -163,25 +146,21 @@ public class TestFanOutOneBlockAsyncDFSOutput { out.flush(false).get(); // restart one datanode which causes one connection broken TEST_UTIL.getDFSCluster().restartDataNode(0); + out.write(b, 0, b.length); try { - out.write(b, 0, b.length); - try { - out.flush(false).get(); - fail("flush should fail"); - } catch (ExecutionException e) { - // we restarted one datanode so the flush should fail - LOG.info("expected exception caught", e); - } - out.recoverAndClose(null); - assertEquals(b.length, FS.getFileStatus(f).getLen()); - byte[] actual = new byte[b.length]; - try (FSDataInputStream in = FS.open(f)) { - in.readFully(actual); - } - assertArrayEquals(b, actual); - } finally { - ensureAllDatanodeAlive(); + out.flush(false).get(); + fail("flush should fail"); + } catch (ExecutionException e) { + // we restarted one datanode so the flush should fail + LOG.info("expected exception caught", e); } + out.recoverAndClose(null); + assertEquals(b.length, FS.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = FS.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); } @Test @@ -219,28 +198,19 @@ public class TestFanOutOneBlockAsyncDFSOutput { Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); xceiverServerDaemonField.setAccessible(true); Class xceiverServerClass = - Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); + Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); numPeersMethod.setAccessible(true); // make one datanode broken - TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true); - try { - Path f = new Path("/test"); - EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - try { - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); - fail("should fail with connection error"); - } catch (IOException e) { - LOG.info("expected exception caught", e); - } - for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) { - Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn); - assertEquals(0, numPeersMethod.invoke(daemon.getRunnable())); - } + DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0); + Path f = new Path("/test"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, + f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { + // should exclude the dead dn when retry so here we only have 2 DNs in pipeline + assertEquals(2, output.getPipeline().length); } finally { - TEST_UTIL.getDFSCluster().restartDataNode(0); - ensureAllDatanodeAlive(); + TEST_UTIL.getDFSCluster().restartDataNode(dnProp); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 325da942b2a..01e44fb275e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; @@ -45,7 +46,7 @@ public class TestAsyncLogRolling extends AbstractTestLogRolling { @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TestAsyncLogRolling.TEST_UTIL.getConfiguration(); - conf.setInt(AsyncFSWAL.ASYNC_WAL_CREATE_MAX_RETRIES, 100); + conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); conf.set(WALFactory.META_WAL_PROVIDER, "asyncfs"); AbstractTestLogRolling.setUpBeforeClass();