HBASE-19768 RegionServer startup failing when DN is dead
This commit is contained in:
parent
842f794a62
commit
ffa28502c4
|
@ -21,23 +21,23 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
|
||||
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
|
||||
import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
|
||||
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
|
||||
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
|
||||
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.Encryptor;
|
||||
|
@ -85,6 +85,7 @@ import org.apache.hadoop.util.DataChecksum;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
||||
|
@ -121,6 +122,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
private FanOutOneBlockAsyncDFSOutputHelper() {
|
||||
}
|
||||
|
||||
public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
|
||||
|
||||
public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
|
||||
// use pooled allocator for performance.
|
||||
private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
|
||||
|
||||
|
@ -129,8 +133,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
|
||||
// Timeouts for communicating with DataNode for streaming writes/reads
|
||||
public static final int READ_TIMEOUT = 60 * 1000;
|
||||
public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
|
||||
public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
|
||||
|
||||
private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
|
||||
|
||||
// helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
|
||||
// getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
|
||||
|
@ -744,58 +748,90 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
DFSClient client = dfs.getClient();
|
||||
String clientName = client.getClientName();
|
||||
ClientProtocol namenode = client.getNamenode();
|
||||
HdfsFileStatus stat;
|
||||
try {
|
||||
stat = FILE_CREATOR.create(namenode, src,
|
||||
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
|
||||
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
|
||||
createParent, replication, blockSize, CryptoProtocolVersion.supported());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof RemoteException) {
|
||||
throw (RemoteException) e;
|
||||
} else {
|
||||
throw new NameNodeException(e);
|
||||
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
|
||||
DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
|
||||
DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
|
||||
for (int retry = 0;; retry++) {
|
||||
HdfsFileStatus stat;
|
||||
try {
|
||||
stat = FILE_CREATOR.create(namenode, src,
|
||||
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
|
||||
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
|
||||
createParent, replication, blockSize, CryptoProtocolVersion.supported());
|
||||
} catch (Exception e) {
|
||||
if (e instanceof RemoteException) {
|
||||
throw (RemoteException) e;
|
||||
} else {
|
||||
throw new NameNodeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
beginFileLease(client, stat.getFileId());
|
||||
boolean succ = false;
|
||||
LocatedBlock locatedBlock = null;
|
||||
List<Future<Channel>> futureList = null;
|
||||
try {
|
||||
DataChecksum summer = createChecksum(client);
|
||||
locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null,
|
||||
stat.getFileId(), null);
|
||||
List<Channel> datanodeList = new ArrayList<>();
|
||||
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
|
||||
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
|
||||
for (Future<Channel> future : futureList) {
|
||||
// fail the creation if there are connection failures since we are fail-fast. The upper
|
||||
// layer should retry itself if needed.
|
||||
datanodeList.add(future.syncUninterruptibly().getNow());
|
||||
}
|
||||
Encryptor encryptor = createEncryptor(conf, stat, client);
|
||||
FanOutOneBlockAsyncDFSOutput output =
|
||||
new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
|
||||
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
|
||||
succ = true;
|
||||
return output;
|
||||
} finally {
|
||||
if (!succ) {
|
||||
if (futureList != null) {
|
||||
for (Future<Channel> f : futureList) {
|
||||
f.addListener(new FutureListener<Channel>() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<Channel> future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
future.getNow().close();
|
||||
}
|
||||
}
|
||||
});
|
||||
beginFileLease(client, stat.getFileId());
|
||||
boolean succ = false;
|
||||
LocatedBlock locatedBlock = null;
|
||||
List<Future<Channel>> futureList = null;
|
||||
try {
|
||||
DataChecksum summer = createChecksum(client);
|
||||
locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
|
||||
excludesNodes, stat.getFileId(), null);
|
||||
List<Channel> datanodeList = new ArrayList<>();
|
||||
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
|
||||
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
|
||||
for (int i = 0, n = futureList.size(); i < n; i++) {
|
||||
try {
|
||||
datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
|
||||
} catch (Exception e) {
|
||||
// exclude the broken DN next time
|
||||
excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
endFileLease(client, stat.getFileId());
|
||||
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
|
||||
Encryptor encryptor = createEncryptor(conf, stat, client);
|
||||
FanOutOneBlockAsyncDFSOutput output =
|
||||
new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
|
||||
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
|
||||
succ = true;
|
||||
return output;
|
||||
} catch (RemoteException e) {
|
||||
LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
|
||||
if (shouldRetryCreate(e)) {
|
||||
if (retry >= createMaxRetries) {
|
||||
throw e.unwrapRemoteException();
|
||||
}
|
||||
} else {
|
||||
throw e.unwrapRemoteException();
|
||||
}
|
||||
} catch (NameNodeException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
|
||||
if (retry >= createMaxRetries) {
|
||||
throw e;
|
||||
}
|
||||
// overwrite the old broken file.
|
||||
overwrite = true;
|
||||
try {
|
||||
Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
} finally {
|
||||
if (!succ) {
|
||||
if (futureList != null) {
|
||||
for (Future<Channel> f : futureList) {
|
||||
f.addListener(new FutureListener<Channel>() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<Channel> future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
future.getNow().close();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
endFileLease(client, stat.getFileId());
|
||||
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,14 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.Sequence;
|
||||
import com.lmax.disruptor.Sequencer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Comparator;
|
||||
|
@ -44,26 +40,23 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
|
@ -140,9 +133,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
|
||||
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
|
||||
|
||||
public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
|
||||
public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
|
||||
|
||||
public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
|
||||
"hbase.wal.async.use-shared-event-loop";
|
||||
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
|
||||
|
@ -189,8 +179,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
|
||||
private final long batchSize;
|
||||
|
||||
private final int createMaxRetries;
|
||||
|
||||
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
|
||||
|
||||
|
@ -257,8 +245,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
|
||||
|
||||
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
|
||||
createMaxRetries =
|
||||
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
|
||||
waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
|
||||
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
|
||||
rollWriter();
|
||||
|
@ -622,46 +608,19 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
|
||||
@Override
|
||||
protected AsyncWriter createWriterInstance(Path path) throws IOException {
|
||||
boolean overwrite = false;
|
||||
for (int retry = 0;; retry++) {
|
||||
try {
|
||||
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoopGroup,
|
||||
channelClass);
|
||||
} catch (RemoteException e) {
|
||||
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
|
||||
if (shouldRetryCreate(e)) {
|
||||
if (retry >= createMaxRetries) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
IOException ioe = e.unwrapRemoteException();
|
||||
// this usually means master already think we are dead so let's fail all the pending
|
||||
// syncs. The shutdown process of RS will wait for all regions to be closed before calling
|
||||
// WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
|
||||
// lock.
|
||||
if (e.getMessage().contains("Parent directory doesn't exist:")) {
|
||||
syncFutures.forEach(f -> f.done(f.getTxid(), ioe));
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
} catch (NameNodeException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
|
||||
if (retry >= createMaxRetries) {
|
||||
break;
|
||||
}
|
||||
// overwrite the old broken file.
|
||||
overwrite = true;
|
||||
try {
|
||||
Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
try {
|
||||
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup,
|
||||
channelClass);
|
||||
} catch (IOException e) {
|
||||
// this usually means master already think we are dead so let's fail all the pending
|
||||
// syncs. The shutdown process of RS will wait for all regions to be closed before calling
|
||||
// WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
|
||||
// lock.
|
||||
if (e.getMessage().contains("Parent directory doesn't exist:")) {
|
||||
syncFutures.forEach(f -> f.done(f.getTxid(), e));
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
throw new IOException("Failed to create wal log writer " + path + " after retrying " +
|
||||
createMaxRetries + " time(s)");
|
||||
}
|
||||
|
||||
private void waitForSafePoint() {
|
||||
|
|
|
@ -35,7 +35,6 @@ import java.util.Random;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -43,9 +42,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
|
@ -54,6 +53,7 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||
|
@ -95,23 +95,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
|
|||
TEST_UTIL.shutdownMiniDFSCluster();
|
||||
}
|
||||
|
||||
private void ensureAllDatanodeAlive() throws InterruptedException {
|
||||
// FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we
|
||||
// can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests
|
||||
// will fail.
|
||||
for (;;) {
|
||||
try {
|
||||
FanOutOneBlockAsyncDFSOutput out =
|
||||
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
|
||||
true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
|
||||
out.close();
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
|
||||
throws IOException, InterruptedException, ExecutionException {
|
||||
List<CompletableFuture<Long>> futures = new ArrayList<>();
|
||||
|
@ -163,25 +146,21 @@ public class TestFanOutOneBlockAsyncDFSOutput {
|
|||
out.flush(false).get();
|
||||
// restart one datanode which causes one connection broken
|
||||
TEST_UTIL.getDFSCluster().restartDataNode(0);
|
||||
out.write(b, 0, b.length);
|
||||
try {
|
||||
out.write(b, 0, b.length);
|
||||
try {
|
||||
out.flush(false).get();
|
||||
fail("flush should fail");
|
||||
} catch (ExecutionException e) {
|
||||
// we restarted one datanode so the flush should fail
|
||||
LOG.info("expected exception caught", e);
|
||||
}
|
||||
out.recoverAndClose(null);
|
||||
assertEquals(b.length, FS.getFileStatus(f).getLen());
|
||||
byte[] actual = new byte[b.length];
|
||||
try (FSDataInputStream in = FS.open(f)) {
|
||||
in.readFully(actual);
|
||||
}
|
||||
assertArrayEquals(b, actual);
|
||||
} finally {
|
||||
ensureAllDatanodeAlive();
|
||||
out.flush(false).get();
|
||||
fail("flush should fail");
|
||||
} catch (ExecutionException e) {
|
||||
// we restarted one datanode so the flush should fail
|
||||
LOG.info("expected exception caught", e);
|
||||
}
|
||||
out.recoverAndClose(null);
|
||||
assertEquals(b.length, FS.getFileStatus(f).getLen());
|
||||
byte[] actual = new byte[b.length];
|
||||
try (FSDataInputStream in = FS.open(f)) {
|
||||
in.readFully(actual);
|
||||
}
|
||||
assertArrayEquals(b, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -219,28 +198,19 @@ public class TestFanOutOneBlockAsyncDFSOutput {
|
|||
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
|
||||
xceiverServerDaemonField.setAccessible(true);
|
||||
Class<?> xceiverServerClass =
|
||||
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
|
||||
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
|
||||
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
|
||||
numPeersMethod.setAccessible(true);
|
||||
// make one datanode broken
|
||||
TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true);
|
||||
try {
|
||||
Path f = new Path("/test");
|
||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||
try {
|
||||
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
|
||||
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
|
||||
fail("should fail with connection error");
|
||||
} catch (IOException e) {
|
||||
LOG.info("expected exception caught", e);
|
||||
}
|
||||
for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) {
|
||||
Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn);
|
||||
assertEquals(0, numPeersMethod.invoke(daemon.getRunnable()));
|
||||
}
|
||||
DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0);
|
||||
Path f = new Path("/test");
|
||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
|
||||
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
|
||||
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
|
||||
assertEquals(2, output.getPipeline().length);
|
||||
} finally {
|
||||
TEST_UTIL.getDFSCluster().restartDataNode(0);
|
||||
ensureAllDatanodeAlive();
|
||||
TEST_UTIL.getDFSCluster().restartDataNode(dnProp);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||
|
@ -45,7 +46,7 @@ public class TestAsyncLogRolling extends AbstractTestLogRolling {
|
|||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TestAsyncLogRolling.TEST_UTIL.getConfiguration();
|
||||
conf.setInt(AsyncFSWAL.ASYNC_WAL_CREATE_MAX_RETRIES, 100);
|
||||
conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100);
|
||||
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
||||
conf.set(WALFactory.META_WAL_PROVIDER, "asyncfs");
|
||||
AbstractTestLogRolling.setUpBeforeClass();
|
||||
|
|
Loading…
Reference in New Issue