HDFS-15940. Fixing and refactoring tests specific to Block recovery.(#2844). Contributed by Viraj Jasani

Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
Viraj Jasani 2021-04-06 00:19:21 +05:30 committed by Ayush Saxena
parent 7a7e194228
commit 2b5fd341b9
3 changed files with 538 additions and 342 deletions

View File

@ -19,21 +19,14 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -44,16 +37,13 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -78,13 +68,11 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -99,11 +87,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.slf4j.event.Level;
@ -298,12 +284,12 @@ public class TestBlockRecovery {
}
}
/** Sync two replicas */
/**
* Sync two replicas.
*/
private void testSyncReplicas(ReplicaRecoveryInfo replica1,
ReplicaRecoveryInfo replica2,
InterDatanodeProtocol dn1,
InterDatanodeProtocol dn2,
long expectLen) throws IOException {
ReplicaRecoveryInfo replica2, InterDatanodeProtocol dn1,
InterDatanodeProtocol dn2) throws IOException {
DatanodeInfo[] locs = new DatanodeInfo[]{
mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
@ -344,7 +330,7 @@ public class TestBlockRecovery {
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
@ -357,7 +343,7 @@ public class TestBlockRecovery {
REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.FINALIZED);
try {
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
testSyncReplicas(replica1, replica2, dn1, dn2);
Assert.fail("Two finalized replicas should not have different lengthes!");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith(
@ -385,7 +371,7 @@ public class TestBlockRecovery {
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
@ -400,7 +386,7 @@ public class TestBlockRecovery {
dn1 = mock(InterDatanodeProtocol.class);
dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
@ -410,6 +396,7 @@ public class TestBlockRecovery {
/**
* BlockRecovery_02.10.
* One replica is Finalized and another is RWR.
*
* @throws IOException in case of an error
*/
@Test(timeout = 60000)
@ -427,7 +414,7 @@ public class TestBlockRecovery {
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
@ -442,7 +429,7 @@ public class TestBlockRecovery {
dn1 = mock(InterDatanodeProtocol.class);
dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
@ -468,7 +455,7 @@ public class TestBlockRecovery {
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
}
@ -476,6 +463,7 @@ public class TestBlockRecovery {
/**
* BlockRecovery_02.12.
* One replica is RBW and another is RWR.
*
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@ -491,7 +479,7 @@ public class TestBlockRecovery {
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
@ -500,6 +488,7 @@ public class TestBlockRecovery {
/**
* BlockRecovery_02.13.
* Two replicas are RWR.
*
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@ -516,7 +505,7 @@ public class TestBlockRecovery {
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
@ -708,132 +697,6 @@ public class TestBlockRecovery {
}
}
@Test(timeout = 60000)
public void testEcRecoverBlocks() throws Throwable {
// Stop the Mocked DN started in startup()
tearDown();
ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
try {
cluster.waitActive();
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer =
new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(),
anyLong());
String topDir = "/myDir";
DFSClient client = new DFSClient(null, spyNN, conf, null);
Path file = new Path(topDir + "/testECLeaseRecover");
client.mkdirs(topDir, null, false);
client.enableErasureCodingPolicy(ecPolicy.getName());
client.setErasureCodingPolicy(topDir, ecPolicy.getName());
OutputStream stm = client.create(file.toString(), true);
// write 5MB File
AppendTestUtil.write(stm, 0, 1024 * 1024 * 5);
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
Thread t = new Thread() {
@Override
public void run() {
try {
stm.close();
} catch (Throwable t) {
err.set(t);
}
}
};
t.start();
// Waiting for close to get to latch
delayer.waitForCall();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return client.getNamenode().recoverLease(file.toString(),
client.getClientName());
} catch (IOException e) {
return false;
}
}
}, 5000, 24000);
delayer.proceed();
} finally {
cluster.shutdown();
}
}
/**
* Test to verify the race between finalizeBlock and Lease recovery
*
* @throws Exception
*/
@Test(timeout = 20000)
public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception {
tearDown();// Stop the Mocked DN started in startup()
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread() {
@Override
public void run() {
try {
DatanodeInfo[] locations = block.getLocations();
final RecoveringBlock recoveringBlock = new RecoveringBlock(
block.getBlock(), locations, block.getBlock()
.getGenerationStamp() + 1);
try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
} catch (Exception e) {
recoveryInitResult.set(false);
}
}
};
recoveryThread.start();
try {
out.close();
} catch (IOException e) {
Assert.assertTrue("Writing should fail",
e.getMessage().contains("are bad. Aborting..."));
} finally {
recoveryThread.join();
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlock().getBlockId(),
block.getBlockSize());
} finally {
if (null != cluster) {
cluster.shutdown();
cluster = null;
}
}
}
/**
* DNs report RUR instead of RBW, RWR or FINALIZED. Primary DN expected to
* throw an exception.
@ -1107,57 +970,7 @@ public class TestBlockRecovery {
}
}
/**
* Test for block recovery taking longer than the heartbeat interval.
*/
@Test(timeout = 300000L)
public void testRecoverySlowerThanHeartbeat() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
SleepAnswer delayer = new SleepAnswer(3000, 6000);
testRecoveryWithDatanodeDelayed(delayer);
}
/**
* Test for block recovery timeout. All recovery attempts will be delayed
* and the first attempt will be lost to trigger recovery timeout and retry.
*/
@Test(timeout = 300000L)
public void testRecoveryTimeout() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final Random r = new Random();
// Make sure first commitBlockSynchronization call from the DN gets lost
// for the recovery timeout to expire and new recovery attempt
// to be started.
SleepAnswer delayer = new SleepAnswer(3000) {
private final AtomicBoolean callRealMethod = new AtomicBoolean();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(3000) + 6000);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
if (callRealMethod.get()) {
return invocation.callRealMethod();
}
callRealMethod.set(true);
return null;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
};
testRecoveryWithDatanodeDelayed(delayer);
}
private void testRecoveryWithDatanodeDelayed(
static void testRecoveryWithDatanodeDelayed(
GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
Configuration configuration = new HdfsConfiguration();
configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
@ -1209,80 +1022,4 @@ public class TestBlockRecovery {
}
}
/**
* Test that block will be recovered even if there are less than the
* specified minReplication datanodes involved in its recovery.
*
* Check that, after recovering, the block will be successfully replicated.
*/
@Test(timeout = 300000L)
public void testRecoveryWillIgnoreMinReplication() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final int blockSize = 4096;
final int numReplicas = 3;
final String filename = "/testIgnoreMinReplication";
final Path filePath = new Path(filename);
Configuration configuration = new HdfsConfiguration();
configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5)
.build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final FSNamesystem fsn = cluster.getNamesystem();
// Create a file and never close the output stream to trigger recovery
FSDataOutputStream out = dfs.create(filePath, (short) numReplicas);
out.write(AppendTestUtil.randomBytes(0, blockSize));
out.hsync();
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), configuration);
LocatedBlock blk = dfsClient.getNamenode().
getBlockLocations(filename, 0, blockSize).
getLastLocatedBlock();
// Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication
List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations());
assertEquals(dataNodes.size(), numReplicas);
for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) {
cluster.stopDataNode(dataNode.getName());
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return fsn.getNumDeadDataNodes() == 2;
}
}, 300, 300000);
// Make sure hard lease expires to trigger replica recovery
cluster.setLeasePeriod(100L, 100L);
// Wait for recovery to succeed
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return dfs.isFileClosed(filePath);
} catch (IOException e) {}
return false;
}
}, 300, 300000);
// Wait for the block to be replicated
DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock(
dfs, filePath), 1, numReplicas, 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -0,0 +1,463 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* Test part 2 for sync all replicas in block recovery.
*/
public class TestBlockRecovery2 {
private static final Logger LOG =
LoggerFactory.getLogger(TestBlockRecovery2.class);
private static final String DATA_DIR =
MiniDFSCluster.getBaseDirectory() + "data";
private DataNode dn;
private Configuration conf;
private boolean tearDownDone;
private final static String CLUSTER_ID = "testClusterID";
private final static String POOL_ID = "BP-TEST";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
"localhost", 5020);
@Rule
public TestName currentTestName = new TestName();
static {
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(LOG, Level.TRACE);
}
/**
* Starts an instance of DataNode.
* @throws IOException
*/
@Before
public void startUp() throws IOException {
tearDownDone = false;
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf,
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
List<StorageLocation> locations = new ArrayList<>();
File dataDir = new File(DATA_DIR);
FileUtil.fullyDelete(dataDir);
dataDir.mkdirs();
StorageLocation location = StorageLocation.parse(dataDir.getPath());
locations.add(location);
final DatanodeProtocolClientSideTranslatorPB namenode =
mock(DatanodeProtocolClientSideTranslatorPB.class);
Mockito.doAnswer(
(Answer<DatanodeRegistration>) invocation ->
(DatanodeRegistration) invocation.getArguments()[0])
.when(namenode)
.registerDatanode(Mockito.any(DatanodeRegistration.class));
when(namenode.versionRequest())
.thenReturn(new NamespaceInfo(1, CLUSTER_ID, POOL_ID, 1L));
when(namenode.sendHeartbeat(
Mockito.any(),
Mockito.any(),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(),
Mockito.anyBoolean(),
Mockito.any(),
Mockito.any()))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1),
null, ThreadLocalRandom.current().nextLong() | 1L));
dn = new DataNode(conf, locations, null, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {
Assert.assertEquals(NN_ADDR, nnAddr);
return namenode;
}
};
// Trigger a heartbeat so that it acknowledges the NN as active.
dn.getAllBpOs().get(0).triggerHeartbeatForTests();
waitForActiveNN();
}
/**
* Wait for active NN up to 15 seconds.
*/
private void waitForActiveNN() {
try {
GenericTestUtils.waitFor(() ->
dn.getAllBpOs().get(0).getActiveNN() != null, 1000, 15 * 1000);
} catch (TimeoutException e) {
// Here its not failing, will again do the assertions for activeNN after
// this waiting period and fails there if BPOS has not acknowledged
// any NN as active.
LOG.warn("Failed to get active NN", e);
} catch (InterruptedException e) {
LOG.warn("InterruptedException while waiting to see active NN", e);
}
Assert.assertNotNull("Failed to get ActiveNN",
dn.getAllBpOs().get(0).getActiveNN());
}
/**
* Cleans the resources and closes the instance of datanode.
* @throws IOException if an error occurred
*/
@After
public void tearDown() throws IOException {
if (!tearDownDone && dn != null) {
try {
dn.shutdown();
} catch(Exception e) {
LOG.error("Cannot close: ", e);
} finally {
File dir = new File(DATA_DIR);
if (dir.exists()) {
Assert.assertTrue(
"Cannot delete data-node dirs", FileUtil.fullyDelete(dir));
}
}
tearDownDone = true;
}
}
/**
* Test to verify the race between finalizeBlock and Lease recovery.
*
* @throws Exception
*/
@Test(timeout = 20000)
public void testRaceBetweenReplicaRecoveryAndFinalizeBlock()
throws Exception {
// Stop the Mocked DN started in startup()
tearDown();
Configuration configuration = new HdfsConfiguration();
configuration.set(
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(configuration)
.numDataNodes(1).build();
try {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread(() -> {
try {
DatanodeInfo[] locations = block.getLocations();
final BlockRecoveryCommand.RecoveringBlock recoveringBlock =
new BlockRecoveryCommand.RecoveringBlock(block.getBlock(),
locations, block.getBlock().getGenerationStamp() + 1);
try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
} catch (Exception e) {
recoveryInitResult.set(false);
}
});
recoveryThread.start();
try {
out.close();
} catch (IOException e) {
Assert.assertTrue("Writing should fail",
e.getMessage().contains("are bad. Aborting..."));
} finally {
recoveryThread.join();
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlock().getBlockId(),
block.getBlockSize());
} finally {
if (null != cluster) {
cluster.shutdown();
}
}
}
/**
* Test for block recovery timeout. All recovery attempts will be delayed
* and the first attempt will be lost to trigger recovery timeout and retry.
*/
@Test(timeout = 300000L)
public void testRecoveryTimeout() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final Random r = new Random();
// Make sure first commitBlockSynchronization call from the DN gets lost
// for the recovery timeout to expire and new recovery attempt
// to be started.
GenericTestUtils.SleepAnswer delayer =
new GenericTestUtils.SleepAnswer(3000) {
private final AtomicBoolean callRealMethod = new AtomicBoolean();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(3000) + 6000);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
if (callRealMethod.get()) {
return invocation.callRealMethod();
}
callRealMethod.set(true);
return null;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
};
TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer);
}
/**
* Test for block recovery taking longer than the heartbeat interval.
*/
@Test(timeout = 300000L)
public void testRecoverySlowerThanHeartbeat() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
GenericTestUtils.SleepAnswer delayer =
new GenericTestUtils.SleepAnswer(3000, 6000);
TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer);
}
@Test(timeout = 60000)
public void testEcRecoverBlocks() throws Throwable {
// Stop the Mocked DN started in startup()
tearDown();
ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
try {
cluster.waitActive();
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer =
new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(),
anyLong());
String topDir = "/myDir";
DFSClient client = new DFSClient(null, spyNN, conf, null);
Path file = new Path(topDir + "/testECLeaseRecover");
client.mkdirs(topDir, null, false);
client.enableErasureCodingPolicy(ecPolicy.getName());
client.setErasureCodingPolicy(topDir, ecPolicy.getName());
OutputStream stm = client.create(file.toString(), true);
// write 5MB File
AppendTestUtil.write(stm, 0, 1024 * 1024 * 5);
final AtomicReference<Throwable> err = new AtomicReference<>();
Thread t = new Thread(() -> {
try {
stm.close();
} catch (Throwable t1) {
err.set(t1);
}
});
t.start();
// Waiting for close to get to latch
delayer.waitForCall();
GenericTestUtils.waitFor(() -> {
try {
return client.getNamenode().recoverLease(file.toString(),
client.getClientName());
} catch (IOException e) {
return false;
}
}, 5000, 24000);
delayer.proceed();
} finally {
cluster.shutdown();
}
}
/**
* Test that block will be recovered even if there are less than the
* specified minReplication datanodes involved in its recovery.
*
* Check that, after recovering, the block will be successfully replicated.
*/
@Test(timeout = 300000L)
public void testRecoveryWillIgnoreMinReplication() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final int blockSize = 4096;
final int numReplicas = 3;
final String filename = "/testIgnoreMinReplication";
final Path filePath = new Path(filename);
Configuration configuration = new HdfsConfiguration();
configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5)
.build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final FSNamesystem fsn = cluster.getNamesystem();
// Create a file and never close the output stream to trigger recovery
FSDataOutputStream out = dfs.create(filePath, (short) numReplicas);
out.write(AppendTestUtil.randomBytes(0, blockSize));
out.hsync();
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), configuration);
LocatedBlock blk = dfsClient.getNamenode().
getBlockLocations(filename, 0, blockSize).
getLastLocatedBlock();
// Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication
List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations());
assertEquals(dataNodes.size(), numReplicas);
for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) {
cluster.stopDataNode(dataNode.getName());
}
GenericTestUtils.waitFor(() -> fsn.getNumDeadDataNodes() == 2,
300, 300000);
// Make sure hard lease expires to trigger replica recovery
cluster.setLeasePeriod(100L, 100L);
// Wait for recovery to succeed
GenericTestUtils.waitFor(() -> {
try {
return dfs.isFileClosed(filePath);
} catch (IOException e) {
LOG.info("Something went wrong.", e);
}
return false;
}, 300, 300000);
// Wait for the block to be replicated
DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock(
dfs, filePath), 1, numReplicas, 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -28,8 +28,6 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -48,8 +46,6 @@ import org.junit.Test;
*/
public class TestDataNodeReconfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(TestBlockRecovery.class);
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
+ "data";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(