HBASE-19768 RegionServer startup failing when DN is dead

This commit is contained in:
zhangduo 2018-01-14 17:30:50 +08:00
parent d24fddf3ed
commit c554340a91
4 changed files with 129 additions and 163 deletions

View File

@ -21,23 +21,23 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import com.google.protobuf.CodedOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
@ -85,6 +85,7 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
@ -121,6 +122,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private FanOutOneBlockAsyncDFSOutputHelper() {
public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
// use pooled allocator for performance.
private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
@ -129,8 +133,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// Timeouts for communicating with DataNode for streaming writes/reads
public static final int READ_TIMEOUT = 60 * 1000;
public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
// helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
// getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
@ -744,58 +748,90 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize, CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
} else {
throw new NameNodeException(e);
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
for (int retry = 0;; retry++) {
HdfsFileStatus stat;
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize, CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
} else {
throw new NameNodeException(e);
beginFileLease(client, stat.getFileId());
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null,
stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed.
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
succ = true;
return output;
} finally {
if (!succ) {
if (futureList != null) {
for (Future<Channel> f : futureList) {
f.addListener(new FutureListener<Channel>() {
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
beginFileLease(client, stat.getFileId());
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
excludesNodes, stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (int i = 0, n = futureList.size(); i < n; i++) {
try {
} catch (Exception e) {
// exclude the broken DN next time
excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
throw e;
endFileLease(client, stat.getFileId());
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
succ = true;
return output;
} catch (RemoteException e) {
LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
if (shouldRetryCreate(e)) {
if (retry >= createMaxRetries) {
throw e.unwrapRemoteException();
} else {
throw e.unwrapRemoteException();
} catch (NameNodeException e) {
throw e;
} catch (IOException e) {
LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
if (retry >= createMaxRetries) {
throw e;
// overwrite the old broken file.
overwrite = true;
try {
Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
} catch (InterruptedException ie) {
throw new InterruptedIOException();
} finally {
if (!succ) {
if (futureList != null) {
for (Future<Channel> f : futureList) {
f.addListener(new FutureListener<Channel>() {
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
endFileLease(client, stat.getFileId());
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));

View File

@ -17,14 +17,10 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Comparator;
@ -44,26 +40,23 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@ -140,9 +133,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
@ -189,8 +179,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final long batchSize;
private final int createMaxRetries;
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
@ -257,8 +245,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
createMaxRetries =
waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
@ -622,46 +608,19 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
protected AsyncWriter createWriterInstance(Path path) throws IOException {
boolean overwrite = false;
for (int retry = 0;; retry++) {
try {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoopGroup,
} catch (RemoteException e) {
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
if (shouldRetryCreate(e)) {
if (retry >= createMaxRetries) {
} else {
IOException ioe = e.unwrapRemoteException();
// this usually means master already think we are dead so let's fail all the pending
// syncs. The shutdown process of RS will wait for all regions to be closed before calling
// WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
// lock.
if (e.getMessage().contains("Parent directory doesn't exist:")) {
syncFutures.forEach(f -> f.done(f.getTxid(), ioe));
throw ioe;
} catch (NameNodeException e) {
throw e;
} catch (IOException e) {
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
if (retry >= createMaxRetries) {
// overwrite the old broken file.
overwrite = true;
try {
Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
} catch (InterruptedException ie) {
throw new InterruptedIOException();
try {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup,
} catch (IOException e) {
// this usually means master already think we are dead so let's fail all the pending
// syncs. The shutdown process of RS will wait for all regions to be closed before calling
// WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
// lock.
if (e.getMessage().contains("Parent directory doesn't exist:")) {
syncFutures.forEach(f -> f.done(f.getTxid(), e));
throw e;
throw new IOException("Failed to create wal log writer " + path + " after retrying " +
createMaxRetries + " time(s)");
private void waitForSafePoint() {

View File

@ -35,7 +35,6 @@ import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -43,9 +42,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Daemon;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@ -54,6 +53,7 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@ -95,23 +95,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
private void ensureAllDatanodeAlive() throws InterruptedException {
// FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we
// can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests
// will fail.
for (;;) {
try {
FanOutOneBlockAsyncDFSOutput out =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
} catch (IOException e) {
static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
throws IOException, InterruptedException, ExecutionException {
List<CompletableFuture<Long>> futures = new ArrayList<>();
@ -163,25 +146,21 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// restart one datanode which causes one connection broken
out.write(b, 0, b.length);
try {
out.write(b, 0, b.length);
try {
fail("flush should fail");
} catch (ExecutionException e) {
// we restarted one datanode so the flush should fail
LOG.info("expected exception caught", e);
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
assertArrayEquals(b, actual);
} finally {
fail("flush should fail");
} catch (ExecutionException e) {
// we restarted one datanode so the flush should fail
LOG.info("expected exception caught", e);
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
assertArrayEquals(b, actual);
@ -219,28 +198,19 @@ public class TestFanOutOneBlockAsyncDFSOutput {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
Class<?> xceiverServerClass =
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
// make one datanode broken
try {
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with connection error");
} catch (IOException e) {
LOG.info("expected exception caught", e);
for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) {
Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn);
assertEquals(0, numPeersMethod.invoke(daemon.getRunnable()));
DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0);
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
} finally {

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
@ -45,7 +46,7 @@ public class TestAsyncLogRolling extends AbstractTestLogRolling {
public static void setUpBeforeClass() throws Exception {
Configuration conf = TestAsyncLogRolling.TEST_UTIL.getConfiguration();
conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100);
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
conf.set(WALFactory.META_WAL_PROVIDER, "asyncfs");