HDFS-14678. Allow triggerBlockReport to a specific namenode. (#1252). Contributed by Leon Gao.
(cherry picked from commit 9a1d8cfaf5
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
(cherry picked from commit 55e40227d1d20bf88448e213937f6e3166452dc9)
This commit is contained in:
parent
460ef89210
commit
a989779f2d
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.client;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* Options that can be specified when manually triggering a block report.
|
||||
*/
|
||||
|
@ -27,17 +29,24 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceStability.Evolving
|
||||
public final class BlockReportOptions {
|
||||
private final boolean incremental;
|
||||
private final InetSocketAddress namenodeAddr;
|
||||
|
||||
private BlockReportOptions(boolean incremental) {
|
||||
private BlockReportOptions(boolean incremental, InetSocketAddress namenodeAddr) {
|
||||
this.incremental = incremental;
|
||||
this.namenodeAddr = namenodeAddr;
|
||||
}
|
||||
|
||||
public boolean isIncremental() {
|
||||
return incremental;
|
||||
}
|
||||
|
||||
public InetSocketAddress getNamenodeAddr() {
|
||||
return namenodeAddr;
|
||||
}
|
||||
|
||||
public static class Factory {
|
||||
private boolean incremental = false;
|
||||
private InetSocketAddress namenodeAddr;
|
||||
|
||||
public Factory() {
|
||||
}
|
||||
|
@ -47,13 +56,18 @@ public final class BlockReportOptions {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Factory setNamenodeAddr(InetSocketAddress namenodeAddr) {
|
||||
this.namenodeAddr = namenodeAddr;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BlockReportOptions build() {
|
||||
return new BlockReportOptions(incremental);
|
||||
return new BlockReportOptions(incremental, namenodeAddr);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BlockReportOptions{incremental=" + incremental + "}";
|
||||
return "BlockReportOptions{incremental=" + incremental + ", namenodeAddr=" + namenodeAddr + "}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -324,10 +324,12 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
|||
public void triggerBlockReport(BlockReportOptions options)
|
||||
throws IOException {
|
||||
try {
|
||||
rpcProxy.triggerBlockReport(NULL_CONTROLLER,
|
||||
TriggerBlockReportRequestProto.newBuilder().
|
||||
setIncremental(options.isIncremental()).
|
||||
build());
|
||||
TriggerBlockReportRequestProto.Builder builder = TriggerBlockReportRequestProto.newBuilder().
|
||||
setIncremental(options.isIncremental());
|
||||
if (options.getNamenodeAddr() != null) {
|
||||
builder.setNnAddress(NetUtils.getHostPortString(options.getNamenodeAddr()));
|
||||
}
|
||||
rpcProxy.triggerBlockReport(NULL_CONTROLLER, builder.build());
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
|
|
|
@ -140,6 +140,7 @@ message GetVolumeReportResponseProto {
|
|||
|
||||
message TriggerBlockReportRequestProto {
|
||||
required bool incremental = 1;
|
||||
optional string nnAddress = 2;
|
||||
}
|
||||
|
||||
message TriggerBlockReportResponseProto {
|
||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa
|
|||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
/**
|
||||
* Implementation for protobuf service that forwards requests
|
||||
|
@ -225,8 +226,12 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
|
|||
RpcController unused, TriggerBlockReportRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.triggerBlockReport(new BlockReportOptions.Factory().
|
||||
setIncremental(request.getIncremental()).build());
|
||||
BlockReportOptions.Factory factory = new BlockReportOptions.Factory().
|
||||
setIncremental(request.getIncremental());
|
||||
if (request.hasNnAddress()) {
|
||||
factory.setNamenodeAddr(NetUtils.createSocketAddr(request.getNnAddress()));
|
||||
}
|
||||
impl.triggerBlockReport(factory.build());
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
|
|
@ -3277,10 +3277,14 @@ public class DataNode extends ReconfigurableBase
|
|||
public void triggerBlockReport(BlockReportOptions options)
|
||||
throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
InetSocketAddress namenodeAddr = options.getNamenodeAddr();
|
||||
boolean shouldTriggerToAllNn = (namenodeAddr == null);
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
if (bpos != null) {
|
||||
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||
actor.triggerBlockReport(options);
|
||||
if (shouldTriggerToAllNn || namenodeAddr.equals(actor.nnAddr)) {
|
||||
actor.triggerBlockReport(options);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -466,7 +466,7 @@ public class DFSAdmin extends FsShell {
|
|||
"\t[-evictWriters <datanode_host:ipc_port>]\n" +
|
||||
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
|
||||
"\t[-metasave filename]\n" +
|
||||
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
|
||||
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode <namenode_host:ipc_port>]]\n" +
|
||||
"\t[-listOpenFiles [-blockingDecommission] [-path <path>]]\n" +
|
||||
"\t[-help [cmd]]\n";
|
||||
|
||||
|
@ -727,6 +727,13 @@ public class DFSAdmin extends FsShell {
|
|||
for (int j = 1; j < argv.length; j++) {
|
||||
args.add(argv[j]);
|
||||
}
|
||||
// Block report to a specific namenode
|
||||
InetSocketAddress namenodeAddr = null;
|
||||
String nnHostPort = StringUtils.popOptionWithArgument("-namenode", args);
|
||||
if (nnHostPort != null) {
|
||||
namenodeAddr = NetUtils.createSocketAddr(nnHostPort);
|
||||
}
|
||||
|
||||
boolean incremental = StringUtils.popOption("-incremental", args);
|
||||
String hostPort = StringUtils.popFirstNonOption(args);
|
||||
if (hostPort == null) {
|
||||
|
@ -742,6 +749,7 @@ public class DFSAdmin extends FsShell {
|
|||
try {
|
||||
dnProxy.triggerBlockReport(
|
||||
new BlockReportOptions.Factory().
|
||||
setNamenodeAddr(namenodeAddr).
|
||||
setIncremental(incremental).
|
||||
build());
|
||||
} catch (IOException e) {
|
||||
|
@ -750,7 +758,9 @@ public class DFSAdmin extends FsShell {
|
|||
}
|
||||
System.out.println("Triggering " +
|
||||
(incremental ? "an incremental " : "a full ") +
|
||||
"block report on " + hostPort + ".");
|
||||
"block report on " + hostPort +
|
||||
(namenodeAddr == null ? "" : " to namenode " + nnHostPort) +
|
||||
".");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1266,7 +1276,7 @@ public class DFSAdmin extends FsShell {
|
|||
+ "\tbe used for checking if a datanode is alive.\n";
|
||||
|
||||
String triggerBlockReport =
|
||||
"-triggerBlockReport [-incremental] <datanode_host:ipc_port>\n"
|
||||
"-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode <namenode_host:ipc_port>]\n"
|
||||
+ "\tTrigger a block report for the datanode.\n"
|
||||
+ "\tIf 'incremental' is specified, it will be an incremental\n"
|
||||
+ "\tblock report; otherwise, it will be a full block report.\n";
|
||||
|
@ -2174,7 +2184,7 @@ public class DFSAdmin extends FsShell {
|
|||
+ " [-getDatanodeInfo <datanode_host:ipc_port>]");
|
||||
} else if ("-triggerBlockReport".equals(cmd)) {
|
||||
System.err.println("Usage: hdfs dfsadmin"
|
||||
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
|
||||
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode <namenode_host:ipc_port>]]");
|
||||
} else if ("-listOpenFiles".equals(cmd)) {
|
||||
System.err.println("Usage: hdfs dfsadmin"
|
||||
+ " [-listOpenFiles [-blockingDecommission] [-path <path>]]");
|
||||
|
@ -2332,7 +2342,7 @@ public class DFSAdmin extends FsShell {
|
|||
return exitCode;
|
||||
}
|
||||
} else if ("-triggerBlockReport".equals(cmd)) {
|
||||
if ((argv.length != 2) && (argv.length != 3)) {
|
||||
if ((argv.length < 2) || (argv.length > 5)) {
|
||||
printUsage(cmd);
|
||||
return exitCode;
|
||||
}
|
||||
|
|
|
@ -372,7 +372,7 @@ Usage:
|
|||
hdfs dfsadmin [-evictWriters <datanode_host:ipc_port>]
|
||||
hdfs dfsadmin [-getDatanodeInfo <datanode_host:ipc_port>]
|
||||
hdfs dfsadmin [-metasave filename]
|
||||
hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
|
||||
hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port> [-namenode] <namenode_host:ipc_port>]
|
||||
hdfs dfsadmin [-listOpenFiles [-blockingDecommission] [-path <path>]]
|
||||
hdfs dfsadmin [-help [cmd]]
|
||||
|
||||
|
@ -410,7 +410,7 @@ Usage:
|
|||
| `-evictWriters` \<datanode\_host:ipc\_port\> | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. |
|
||||
| `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
|
||||
| `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted |
|
||||
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |
|
||||
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> `[-namenode]` \<namenode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. If '-namenode \<host\>:\<port\>' is given, it only sends block report to a specified namenode. |
|
||||
| `-listOpenFiles` `[-blockingDecommission]` `[-path <path>]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. Open files list will be filtered by given type and path. Add -blockingDecommission option if you only want to list open files that are blocking the DataNode decommissioning. |
|
||||
| `-help` [cmd] | Displays help for the given command or all commands if none is specified. |
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
|
@ -44,11 +45,13 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* Test manually requesting that the DataNode send a block report.
|
||||
*/
|
||||
public final class TestTriggerBlockReport {
|
||||
private void testTriggerBlockReport(boolean incremental) throws Exception {
|
||||
private void testTriggerBlockReport(boolean incremental, boolean withSpecificNN) throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
||||
// Set a really long value for dfs.blockreport.intervalMsec and
|
||||
|
@ -59,16 +62,24 @@ public final class TestTriggerBlockReport {
|
|||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);
|
||||
|
||||
final MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DatanodeProtocolClientSideTranslatorPB spy =
|
||||
cluster.transitionToActive(0);
|
||||
FileSystem fs = cluster.getFileSystem(0);
|
||||
DatanodeProtocolClientSideTranslatorPB spyOnNn0 =
|
||||
InternalDataNodeTestUtils.spyOnBposToNN(
|
||||
cluster.getDataNodes().get(0), cluster.getNameNode());
|
||||
cluster.getDataNodes().get(0), cluster.getNameNode(0));
|
||||
DatanodeProtocolClientSideTranslatorPB spyOnNn1 =
|
||||
InternalDataNodeTestUtils.spyOnBposToNN(
|
||||
cluster.getDataNodes().get(0), cluster.getNameNode(1));
|
||||
DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);
|
||||
|
||||
// We should get 1 incremental block report.
|
||||
Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
|
||||
// We should get 1 incremental block report on both NNs.
|
||||
Mockito.verify(spyOnNn0, timeout(60000).times(1)).blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
|
@ -77,12 +88,21 @@ public final class TestTriggerBlockReport {
|
|||
// since the interval we configured is so long.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Thread.sleep(10);
|
||||
Mockito.verify(spy, times(0)).blockReport(
|
||||
Mockito.verify(spyOnNn0, times(0)).blockReport(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageBlockReport[].class),
|
||||
Mockito.<BlockReportContext>anyObject());
|
||||
Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
|
||||
Mockito.verify(spyOnNn0, times(1)).blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
Mockito.verify(spyOnNn1, times(0)).blockReport(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageBlockReport[].class),
|
||||
any());
|
||||
Mockito.verify(spyOnNn1, times(1)).blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
|
@ -93,20 +113,21 @@ public final class TestTriggerBlockReport {
|
|||
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
|
||||
new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null);
|
||||
DataNode datanode = cluster.getDataNodes().get(0);
|
||||
BPServiceActor actor =
|
||||
datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
|
||||
final FsDatasetSpi<?> dataset = datanode.getFSDataset();
|
||||
final DatanodeStorage storage;
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dataset.getFsVolumeReferences()) {
|
||||
storage = dataset.getStorage(volumes.get(0).getStorageID());
|
||||
for (BPServiceActor actor : datanode.getAllBpOs().get(0).getBPServiceActors()) {
|
||||
final FsDatasetSpi<?> dataset = datanode.getFSDataset();
|
||||
final DatanodeStorage storage;
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
|
||||
storage = dataset.getStorage(volumes.get(0).getStorageID());
|
||||
}
|
||||
actor.getIbrManager().addRDBI(rdbi, storage);
|
||||
}
|
||||
|
||||
actor.getIbrManager().addRDBI(rdbi, storage);
|
||||
|
||||
// Manually trigger a block report.
|
||||
// Only trigger block report to NN1 when testing triggering block report on specific namenode.
|
||||
InetSocketAddress nnAddr = withSpecificNN ? cluster.getNameNode(1).getServiceRpcAddress() : null;
|
||||
datanode.triggerBlockReport(
|
||||
new BlockReportOptions.Factory().
|
||||
setNamenodeAddr(nnAddr).
|
||||
setIncremental(incremental).
|
||||
build()
|
||||
);
|
||||
|
@ -114,13 +135,25 @@ public final class TestTriggerBlockReport {
|
|||
// triggerBlockReport returns before the block report is
|
||||
// actually sent. Wait for it to be sent here.
|
||||
if (incremental) {
|
||||
Mockito.verify(spy, timeout(60000).times(2)).
|
||||
Mockito.verify(spyOnNn1, timeout(60000).times(2)).
|
||||
blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
int nn0IncrBlockReport = withSpecificNN ? 1 : 2;
|
||||
Mockito.verify(spyOnNn0, timeout(60000).times(nn0IncrBlockReport)).
|
||||
blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
} else {
|
||||
Mockito.verify(spy, timeout(60000)).blockReport(
|
||||
Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReport(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageBlockReport[].class),
|
||||
any());
|
||||
int nn0BlockReport = withSpecificNN ? 0 : 1;
|
||||
Mockito.verify(spyOnNn0, timeout(60000).times(nn0BlockReport)).blockReport(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageBlockReport[].class),
|
||||
|
@ -132,11 +165,13 @@ public final class TestTriggerBlockReport {
|
|||
|
||||
@Test
|
||||
public void testTriggerFullBlockReport() throws Exception {
|
||||
testTriggerBlockReport(false);
|
||||
testTriggerBlockReport(false, false);
|
||||
testTriggerBlockReport(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerIncrementalBlockReport() throws Exception {
|
||||
testTriggerBlockReport(true);
|
||||
testTriggerBlockReport(true, false);
|
||||
testTriggerBlockReport(true, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -245,6 +245,31 @@ public class TestDFSAdmin {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testTriggerBlockReport() throws Exception {
|
||||
redirectStream();
|
||||
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
|
||||
final DataNode dn = cluster.getDataNodes().get(0);
|
||||
final NameNode nn = cluster.getNameNode();
|
||||
|
||||
final String dnAddr = String.format(
|
||||
"%s:%d",
|
||||
dn.getXferAddress().getHostString(),
|
||||
dn.getIpcPort());
|
||||
final String nnAddr = nn.getHostAndPort();
|
||||
resetStream();
|
||||
final List<String> outs = Lists.newArrayList();
|
||||
final int ret = ToolRunner.run(dfsAdmin,
|
||||
new String[]{"-triggerBlockReport", dnAddr, "-incremental", "-namenode", nnAddr});
|
||||
assertEquals(0, ret);
|
||||
|
||||
scanIntoList(out, outs);
|
||||
assertEquals(1, outs.size());
|
||||
assertThat(outs.get(0),
|
||||
is(allOf(containsString("Triggering an incremental block report on "),
|
||||
containsString(" to namenode "))));
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testGetVolumeReport() throws Exception {
|
||||
redirectStream();
|
||||
|
|
Loading…
Reference in New Issue