HDFS-15695. NN should not let the balancer run in safemode (#2489). Contributed by Daryn Sharp and Ahmed Hussein

This commit is contained in:
Ahmed Hussein 2020-12-02 13:59:00 -06:00 committed by GitHub
parent fa773a8326
commit 2b5b556dd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 97 deletions

View File

@ -663,6 +663,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
} }
checkNNStartup(); checkNNStartup();
namesystem.checkSuperuserPrivilege(); namesystem.checkSuperuserPrivilege();
namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
return namesystem.getBlocks(datanode, size, minBlockSize); return namesystem.getBlocks(datanode, size, minBlockSize);
} }

View File

@ -19,8 +19,8 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -28,37 +28,50 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.TestBlockManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This class tests if getblocks request works correctly. * This class tests if getblocks request works correctly.
*/ */
public class TestGetBlocks { public class TestGetBlocks {
private static final int blockSize = 8192; private static final Logger LOG =
private static final String racks[] = new String[] { "/d1/r1", "/d1/r1", LoggerFactory.getLogger(TestBlockManager.class);
private static final int BLOCK_SIZE = 8192;
private static final String[] RACKS = new String[]{"/d1/r1", "/d1/r1",
"/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"}; "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"};
private static final int numDatanodes = racks.length; private static final int NUM_DATA_NODES = RACKS.length;
/** /**
* Stop the heartbeat of a datanode in the MiniDFSCluster * Stop the heartbeat of a datanode in the MiniDFSCluster
@ -96,7 +109,7 @@ public class TestGetBlocks {
conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
staleInterval); staleInterval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).racks(racks).build(); .numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
cluster.waitActive(); cluster.waitActive();
InetSocketAddress addr = new InetSocketAddress("localhost", InetSocketAddress addr = new InetSocketAddress("localhost",
@ -105,7 +118,7 @@ public class TestGetBlocks {
List<DatanodeDescriptor> nodeInfoList = cluster.getNameNode() List<DatanodeDescriptor> nodeInfoList = cluster.getNameNode()
.getNamesystem().getBlockManager().getDatanodeManager() .getNamesystem().getBlockManager().getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.LIVE); .getDatanodeListForReport(DatanodeReportType.LIVE);
assertEquals("Unexpected number of datanodes", numDatanodes, assertEquals("Unexpected number of datanodes", NUM_DATA_NODES,
nodeInfoList.size()); nodeInfoList.size());
FileSystem fileSys = cluster.getFileSystem(); FileSystem fileSys = cluster.getFileSystem();
FSDataOutputStream stm = null; FSDataOutputStream stm = null;
@ -116,14 +129,14 @@ public class TestGetBlocks {
stm = fileSys.create(fileName, true, stm = fileSys.create(fileName, true,
fileSys.getConf().getInt( fileSys.getConf().getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) 3, blockSize); (short) 3, BLOCK_SIZE);
stm.write(new byte[(blockSize * 3) / 2]); stm.write(new byte[(BLOCK_SIZE * 3) / 2]);
// We do not close the stream so that // We do not close the stream so that
// the writing seems to be still ongoing // the writing seems to be still ongoing
stm.hflush(); stm.hflush();
LocatedBlocks blocks = client.getNamenode().getBlockLocations( LocatedBlocks blocks = client.getNamenode().getBlockLocations(
fileName.toString(), 0, blockSize); fileName.toString(), 0, BLOCK_SIZE);
DatanodeInfo[] nodes = blocks.get(0).getLocations(); DatanodeInfo[] nodes = blocks.get(0).getLocations();
assertEquals(nodes.length, 3); assertEquals(nodes.length, 3);
DataNode staleNode = null; DataNode staleNode = null;
@ -139,7 +152,7 @@ public class TestGetBlocks {
-(staleInterval + 1)); -(staleInterval + 1));
LocatedBlocks blocksAfterStale = client.getNamenode().getBlockLocations( LocatedBlocks blocksAfterStale = client.getNamenode().getBlockLocations(
fileName.toString(), 0, blockSize); fileName.toString(), 0, BLOCK_SIZE);
DatanodeInfo[] nodesAfterStale = blocksAfterStale.get(0).getLocations(); DatanodeInfo[] nodesAfterStale = blocksAfterStale.get(0).getLocations();
assertEquals(nodesAfterStale.length, 3); assertEquals(nodesAfterStale.length, 3);
assertEquals(nodesAfterStale[2].getHostName(), nodes[0].getHostName()); assertEquals(nodesAfterStale[2].getHostName(), nodes[0].getHostName());
@ -175,133 +188,128 @@ public class TestGetBlocks {
} }
} }
/** test getBlocks */ /**
* Test getBlocks.
*/
@Test @Test
public void testGetBlocks() throws Exception { public void testGetBlocks() throws Exception {
final Configuration CONF = new HdfsConfiguration(); DistributedFileSystem fs = null;
Path testFile = null;
BlockWithLocations[] locs;
final int blkSize = 1024;
final String filePath = "/tmp.txt";
final int blkLocsSize = 13;
long fileLen = 12 * blkSize + 1;
final short replicationFactor = (short) 2;
final Configuration config = new HdfsConfiguration();
final short REPLICATION_FACTOR = (short) 2; // set configurations
final int DEFAULT_BLOCK_SIZE = 1024; config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blkSize);
config.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
blkSize);
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
CONF.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, .numDataNodes(replicationFactor)
DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF)
.numDataNodes(REPLICATION_FACTOR)
.storagesPerDatanode(4) .storagesPerDatanode(4)
.build(); .build();
try { try {
cluster.waitActive(); cluster.waitActive();
// the third block will not be visible to getBlocks // the third block will not be visible to getBlocks
long fileLen = 12 * DEFAULT_BLOCK_SIZE + 1; testFile = new Path(filePath);
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/tmp.txt"), DFSTestUtil.createFile(cluster.getFileSystem(), testFile,
fileLen, REPLICATION_FACTOR, 0L); fileLen, replicationFactor, 0L);
// get blocks & data nodes // get blocks & data nodes
List<LocatedBlock> locatedBlocks; fs = cluster.getFileSystem();
DatanodeInfo[] dataNodes = null; DFSTestUtil.waitForReplication(fs, testFile, replicationFactor, 60000);
boolean notWritten; RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(testFile);
final DFSClient dfsclient = new DFSClient( LocatedFileStatus stat = it.next();
DFSUtilClient.getNNAddress(CONF), CONF); BlockLocation[] blockLocations = stat.getBlockLocations();
do { assertEquals(blkLocsSize, blockLocations.length);
locatedBlocks = dfsclient.getNamenode() HdfsDataInputStream dis = (HdfsDataInputStream) fs.open(testFile);
.getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks(); Collection<LocatedBlock> dinfo = dis.getAllBlocks();
assertEquals(13, locatedBlocks.size()); dis.close();
notWritten = false; DatanodeInfo[] dataNodes = dinfo.iterator().next().getLocations();
for (int i = 0; i < 2; i++) {
dataNodes = locatedBlocks.get(i).getLocations();
if (dataNodes.length != REPLICATION_FACTOR) {
notWritten = true;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
break;
}
}
} while (notWritten);
dfsclient.close();
// get RPC client to namenode // get RPC client to namenode
InetSocketAddress addr = new InetSocketAddress("localhost", InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort()); cluster.getNameNodePort());
NamenodeProtocol namenode = NameNodeProxies.createProxy(CONF, NamenodeProtocol namenode = NameNodeProxies.createProxy(config,
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy(); DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
// get blocks of size fileLen from dataNodes[0], with minBlockSize as
// fileLen
BlockWithLocations[] locs;
// Should return all 13 blocks, as minBlockSize is not passed // Should return all 13 blocks, as minBlockSize is not passed
locs = namenode.getBlocks(dataNodes[0], fileLen, 0) locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
.getBlocks(); assertEquals(blkLocsSize, locs.length);
assertEquals(13, locs.length);
assertEquals(locs[0].getStorageIDs().length, 2);
assertEquals(locs[1].getStorageIDs().length, 2);
// Should return 12 blocks, as minBlockSize is DEFAULT_BLOCK_SIZE assertEquals(locs[0].getStorageIDs().length, replicationFactor);
locs = namenode.getBlocks(dataNodes[0], fileLen, DEFAULT_BLOCK_SIZE) assertEquals(locs[1].getStorageIDs().length, replicationFactor);
.getBlocks();
assertEquals(12, locs.length); // Should return 12 blocks, as minBlockSize is blkSize
assertEquals(locs[0].getStorageIDs().length, 2); locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize).getBlocks();
assertEquals(locs[1].getStorageIDs().length, 2); assertEquals(blkLocsSize - 1, locs.length);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
assertEquals(locs[1].getStorageIDs().length, replicationFactor);
// get blocks of size BlockSize from dataNodes[0] // get blocks of size BlockSize from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], DEFAULT_BLOCK_SIZE, locs = namenode.getBlocks(dataNodes[0], blkSize,
DEFAULT_BLOCK_SIZE).getBlocks(); blkSize).getBlocks();
assertEquals(locs.length, 1); assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, 2); assertEquals(locs[0].getStorageIDs().length, replicationFactor);
// get blocks of size 1 from dataNodes[0] // get blocks of size 1 from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], 1, 1).getBlocks(); locs = namenode.getBlocks(dataNodes[0], 1, 1).getBlocks();
assertEquals(locs.length, 1); assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, 2); assertEquals(locs[0].getStorageIDs().length, replicationFactor);
// get blocks of size 0 from dataNodes[0] // get blocks of size 0 from dataNodes[0]
getBlocksWithException(namenode, dataNodes[0], 0, 0); getBlocksWithException(namenode, dataNodes[0], 0, 0,
RemoteException.class, "IllegalArgumentException");
// get blocks of size -1 from dataNodes[0] // get blocks of size -1 from dataNodes[0]
getBlocksWithException(namenode, dataNodes[0], -1, 0); getBlocksWithException(namenode, dataNodes[0], -1, 0,
RemoteException.class, "IllegalArgumentException");
// minBlockSize is -1 // minBlockSize is -1
getBlocksWithException(namenode, dataNodes[0], DEFAULT_BLOCK_SIZE, -1); getBlocksWithException(namenode, dataNodes[0], blkSize, -1,
RemoteException.class, "IllegalArgumentException");
// get blocks of size BlockSize from a non-existent datanode // get blocks of size BlockSize from a non-existent datanode
DatanodeInfo info = DFSTestUtil.getDatanodeInfo("1.2.3.4"); DatanodeInfo info = DFSTestUtil.getDatanodeInfo("1.2.3.4");
getBlocksWithIncorrectDatanodeException(namenode, info, 2, 0); getBlocksWithException(namenode, info, replicationFactor, 0,
RemoteException.class, "HadoopIllegalArgumentException");
testBlockIterator(cluster); testBlockIterator(cluster);
// Namenode should refuse to provide block locations to the balancer
// while in safemode.
locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
assertEquals(blkLocsSize, locs.length);
assertFalse(fs.isInSafeMode());
LOG.info("Entering safe mode");
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
LOG.info("Entered safe mode");
assertTrue(fs.isInSafeMode());
getBlocksWithException(namenode, info, replicationFactor, 0,
RemoteException.class,
"Cannot execute getBlocks. Name node is in safe mode.");
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
assertFalse(fs.isInSafeMode());
} finally { } finally {
if (fs != null) {
fs.delete(testFile, true);
fs.close();
}
cluster.shutdown(); cluster.shutdown();
} }
} }
private void getBlocksWithException(NamenodeProtocol namenode, private void getBlocksWithException(NamenodeProtocol namenode,
DatanodeInfo datanode, long size, long minBlockSize) throws IOException { DatanodeInfo datanode, long size, long minBlkSize, Class exClass,
boolean getException = false; String msg) throws Exception {
try {
namenode.getBlocks(datanode, size, minBlockSize);
} catch (RemoteException e) {
getException = true;
assertTrue(e.getClassName().contains("IllegalArgumentException"));
}
assertTrue(getException);
}
private void getBlocksWithIncorrectDatanodeException( // Namenode should refuse should fail
NamenodeProtocol namenode, DatanodeInfo datanode, long size, LambdaTestUtils.intercept(exClass,
long minBlockSize) msg, () -> namenode.getBlocks(datanode, size, minBlkSize));
throws IOException {
boolean getException = false;
try {
namenode.getBlocks(datanode, size, minBlockSize);
} catch (RemoteException e) {
getException = true;
assertTrue(e.getClassName().contains("HadoopIllegalArgumentException"));
}
assertTrue(getException);
} }
/** /**