HDFS-11032: [SPS]: Handling of block movement failure at the coordinator datanode. Contributed by Rakesh R

This commit is contained in:
Uma Maheswara Rao G 2016-12-22 17:07:49 -08:00 committed by Uma Maheswara Rao Gangumalla
parent 5179d99b7e
commit d81611fe55
2 changed files with 143 additions and 34 deletions

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -251,6 +252,12 @@ public class StoragePolicySatisfyWorker {
+ " satisfying storageType:{}", + " satisfying storageType:{}",
block, source, target, targetStorageType); block, source, target, targetStorageType);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (BlockPinningException e) {
// Pinned block won't be able to move to a different node. So, its not
// required to do retries, just marked as SUCCESS.
LOG.debug("Pinned block can't be moved, so skipping block:{}", block,
e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (IOException e) { } catch (IOException e) {
// TODO: handle failure retries // TODO: handle failure retries
LOG.warn( LOG.warn(
@ -282,7 +289,7 @@ public class StoragePolicySatisfyWorker {
response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
} }
String logInfo = "reportedBlock move is failed"; String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
} }
} }

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -36,12 +37,15 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -66,10 +70,16 @@ public class TestStoragePolicySatisfier {
final private long capacity = 2 * 256 * 1024 * 1024; final private long capacity = 2 * 256 * 1024 * 1024;
final private String file = "/testMoveWhenStoragePolicyNotSatisfying"; final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
private DistributedFileSystem dfs = null; private DistributedFileSystem dfs = null;
private static final int DEFAULT_BLOCK_SIZE = 1024;
@Before private void shutdownCluster() {
public void setUp() throws IOException { if (hdfsCluster != null) {
config.setLong("dfs.block.size", 1024); hdfsCluster.shutdown();
}
}
private void createCluster() throws IOException {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes, hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
storagesPerDatanode, capacity); storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem(); dfs = hdfsCluster.getFileSystem();
@ -81,6 +91,7 @@ public class TestStoragePolicySatisfier {
throws Exception { throws Exception {
try { try {
createCluster();
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD"); dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
@ -99,7 +110,7 @@ public class TestStoragePolicySatisfier {
// Wait till namenode notified about the block location details // Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000); waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -107,6 +118,7 @@ public class TestStoragePolicySatisfier {
public void testWhenStoragePolicySetToALLSSD() public void testWhenStoragePolicySetToALLSSD()
throws Exception { throws Exception {
try { try {
createCluster();
// Change policy to ALL_SSD // Change policy to ALL_SSD
dfs.setStoragePolicy(new Path(file), "ALL_SSD"); dfs.setStoragePolicy(new Path(file), "ALL_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
@ -127,7 +139,7 @@ public class TestStoragePolicySatisfier {
// areas // areas
waitExpectedStorageType(file, StorageType.SSD, 3, 30000); waitExpectedStorageType(file, StorageType.SSD, 3, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -135,6 +147,7 @@ public class TestStoragePolicySatisfier {
public void testWhenStoragePolicySetToONESSD() public void testWhenStoragePolicySetToONESSD()
throws Exception { throws Exception {
try { try {
createCluster();
// Change policy to ONE_SSD // Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), "ONE_SSD"); dfs.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
@ -154,7 +167,7 @@ public class TestStoragePolicySatisfier {
waitExpectedStorageType(file, StorageType.SSD, 1, 30000); waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, 2, 30000); waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -165,6 +178,7 @@ public class TestStoragePolicySatisfier {
@Test(timeout = 300000) @Test(timeout = 300000)
public void testPerTrackIdBlocksStorageMovementResults() throws Exception { public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
try { try {
createCluster();
// Change policy to ONE_SSD // Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), "ONE_SSD"); dfs.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
@ -186,7 +200,7 @@ public class TestStoragePolicySatisfier {
waitForBlocksMovementResult(1, 30000); waitForBlocksMovementResult(1, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -196,18 +210,18 @@ public class TestStoragePolicySatisfier {
*/ */
@Test(timeout = 300000) @Test(timeout = 300000)
public void testMultipleFilesForSatisfyStoragePolicy() throws Exception { public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
try {
createCluster();
List<String> files = new ArrayList<>(); List<String> files = new ArrayList<>();
files.add(file); files.add(file);
// Creates 4 more files. Send all of them for satisfying the storage policy // Creates 4 more files. Send all of them for satisfying the storage
// together. // policy together.
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i; String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
files.add(file1); files.add(file1);
writeContent(file1); writeContent(file1);
} }
try {
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
List<Long> blockCollectionIds = new ArrayList<>(); List<Long> blockCollectionIds = new ArrayList<>();
// Change policy to ONE_SSD // Change policy to ONE_SSD
@ -237,7 +251,7 @@ public class TestStoragePolicySatisfier {
waitForBlocksMovementResult(blockCollectionIds.size(), 30000); waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -247,10 +261,10 @@ public class TestStoragePolicySatisfier {
*/ */
@Test(timeout = 300000) @Test(timeout = 300000)
public void testSatisfyFileWithHdfsAdmin() throws Exception { public void testSatisfyFileWithHdfsAdmin() throws Exception {
try {
createCluster();
HdfsAdmin hdfsAdmin = HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config); new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD"); dfs.setStoragePolicy(new Path(file), "COLD");
@ -267,7 +281,7 @@ public class TestStoragePolicySatisfier {
// Wait till namenode notified about the block location details // Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000); waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -277,11 +291,10 @@ public class TestStoragePolicySatisfier {
*/ */
@Test(timeout = 300000) @Test(timeout = 300000)
public void testSatisfyDirWithHdfsAdmin() throws Exception { public void testSatisfyDirWithHdfsAdmin() throws Exception {
try {
createCluster();
HdfsAdmin hdfsAdmin = HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config); new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
final String subDir = "/subDir"; final String subDir = "/subDir";
final String subFile1 = subDir + "/subFile1"; final String subFile1 = subDir + "/subFile1";
final String subDir2 = subDir + "/subDir2"; final String subDir2 = subDir + "/subDir2";
@ -310,7 +323,7 @@ public class TestStoragePolicySatisfier {
// take no effect for the sub-dir's file in the directory. // take no effect for the sub-dir's file in the directory.
waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000); waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -321,6 +334,7 @@ public class TestStoragePolicySatisfier {
@Test(timeout = 300000) @Test(timeout = 300000)
public void testSatisfyWithExceptions() throws Exception { public void testSatisfyWithExceptions() throws Exception {
try { try {
createCluster();
final String nonExistingFile = "/noneExistingFile"; final String nonExistingFile = "/noneExistingFile";
hdfsCluster.getConfiguration(0). hdfsCluster.getConfiguration(0).
setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false); setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
@ -354,7 +368,7 @@ public class TestStoragePolicySatisfier {
} }
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -376,6 +390,7 @@ public class TestStoragePolicySatisfier {
public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy() public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
throws Exception { throws Exception {
try { try {
createCluster();
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD"); dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
@ -397,7 +412,7 @@ public class TestStoragePolicySatisfier {
waitForBlocksMovementResult(1, 30000); waitForBlocksMovementResult(1, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -418,6 +433,7 @@ public class TestStoragePolicySatisfier {
public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
throws Exception { throws Exception {
try { try {
createCluster();
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD"); dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
@ -440,7 +456,7 @@ public class TestStoragePolicySatisfier {
// re-attempted. // re-attempted.
waitForAttemptedItems(1, 30000); waitForAttemptedItems(1, 30000);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
@ -452,6 +468,7 @@ public class TestStoragePolicySatisfier {
public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier() public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
throws IOException { throws IOException {
try { try {
createCluster();
// Simulate Mover by creating MOVER_ID file // Simulate Mover by creating MOVER_ID file
DFSTestUtil.createFile(hdfsCluster.getFileSystem(), DFSTestUtil.createFile(hdfsCluster.getFileSystem(),
HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0); HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
@ -461,10 +478,95 @@ public class TestStoragePolicySatisfier {
Assert.assertFalse("SPS should not start " Assert.assertFalse("SPS should not start "
+ "when a Mover instance is running", running); + "when a Mover instance is running", running);
} finally { } finally {
hdfsCluster.shutdown(); shutdownCluster();
} }
} }
/**
* Test to verify that satisfy worker can't move blocks. If the given block is
* pinned it shouldn't be considered for retries.
*/
@Test(timeout = 120000)
public void testMoveWithBlockPinning() throws Exception {
config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}})
.build();
hdfsCluster.waitActive();
dfs = hdfsCluster.getFileSystem();
// create a file with replication factor 3 and mark 2 pinned block
// locations.
final String file1 = createFileAndSimulateFavoredNodes(2);
// Change policy to COLD
dfs.setStoragePolicy(new Path(file1), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file1);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
// Adding DISK based datanodes
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
// No block movement will be scheduled as there is no target node available
// with the required storage type.
waitForAttemptedItems(1, 30000);
waitForBlocksMovementResult(1, 30000);
waitExpectedStorageType(file1, StorageType.ARCHIVE, 1, 30000);
waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
}
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
throws IOException {
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
final String file1 = "/testMoveWithBlockPinning";
// replication factor 3
InetSocketAddress[] favoredNodes = new InetSocketAddress[favoredNodesCount];
for (int i = 0; i < favoredNodesCount; i++) {
favoredNodes[i] = dns.get(i).getXferAddress();
}
DFSTestUtil.createFile(dfs, new Path(file1), false, 1024, 100,
DEFAULT_BLOCK_SIZE, (short) 3, 0, false, favoredNodes);
LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
Assert.assertEquals("Wrong block count", 1,
locatedBlocks.locatedBlockCount());
// verify storage type before movement
LocatedBlock lb = locatedBlocks.get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
// Mock FsDatasetSpi#getPinning to show that the block is pinned.
DatanodeInfo[] locations = lb.getLocations();
Assert.assertEquals(3, locations.length);
Assert.assertTrue(favoredNodesCount < locations.length);
for(DatanodeInfo dnInfo: locations){
LOG.info("Simulate block pinning in datanode {}",
locations[favoredNodesCount]);
DataNode dn = hdfsCluster.getDataNode(dnInfo.getIpcPort());
DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
favoredNodesCount--;
if (favoredNodesCount <= 0) {
break;// marked favoredNodesCount number of pinned block location
}
}
return file1;
}
private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
int timeout) throws TimeoutException, InterruptedException { int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();