HDFS-7278. Add a command that allows sysadmins to manually trigger full block reports from a DN (cmccabe)

(cherry picked from commit baf794dc40)
This commit is contained in:
Colin Patrick Mccabe 2014-10-27 09:53:16 -07:00
parent 8d781aaad3
commit 5f3d967aae
11 changed files with 335 additions and 1 deletions

View File

@ -5,6 +5,9 @@ Release 2.7.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
HDFS-7278. Add a command that allows sysadmins to manually trigger full
block reports from a DN (cmccabe)
IMPROVEMENTS

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Options that can be specified when manually triggering a block report.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class BlockReportOptions {
private final boolean incremental;
private BlockReportOptions(boolean incremental) {
this.incremental = incremental;
}
public boolean isIncremental() {
return incremental;
}
public static class Factory {
private boolean incremental = false;
public Factory() {
}
public Factory setIncremental(boolean incremental) {
this.incremental = incremental;
return this;
}
public BlockReportOptions build() {
return new BlockReportOptions(incremental);
}
}
@Override
public String toString() {
return "BlockReportOptions{incremental=" + incremental + "}";
}
}

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
@ -158,4 +159,10 @@ public interface ClientDatanodeProtocol {
* @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
*/
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
/**
* Trigger a new block report.
*/
void triggerBlockReport(BlockReportOptions options)
throws IOException;
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Optional;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
@ -49,6 +50,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Shutdo
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
@ -74,6 +77,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
ShutdownDatanodeResponseProto.newBuilder().build();
private final static StartReconfigurationResponseProto START_RECONFIG_RESP =
StartReconfigurationResponseProto.newBuilder().build();
private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP =
TriggerBlockReportResponseProto.newBuilder().build();
private final ClientDatanodeProtocol impl;
@ -237,4 +242,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
}
return builder.build();
}
@Override
public TriggerBlockReportResponseProto triggerBlockReport(
RpcController unused, TriggerBlockReportRequestProto request)
throws ServiceException {
try {
impl.triggerBlockReport(new BlockReportOptions.Factory().
setIncremental(request.getIncremental()).build());
} catch (IOException e) {
throw new ServiceException(e);
}
return TRIGGER_BLOCK_REPORT_RESP;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -57,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRec
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -333,4 +336,17 @@ public class ClientDatanodeProtocolTranslatorPB implements
}
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
}
@Override
public void triggerBlockReport(BlockReportOptions options)
throws IOException {
try {
rpcProxy.triggerBlockReport(NULL_CONTROLLER,
TriggerBlockReportRequestProto.newBuilder().
setIncremental(options.isIncremental()).
build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -986,4 +987,20 @@ class BPServiceActor implements Runnable {
return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
}
}
void triggerBlockReport(BlockReportOptions options) throws IOException {
if (options.isIncremental()) {
LOG.info(bpos.toString() + ": scheduling an incremental block report.");
synchronized(pendingIncrementalBRperStorage) {
sendImmediateIBR = true;
pendingIncrementalBRperStorage.notifyAll();
}
} else {
LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(pendingIncrementalBRperStorage) {
lastBlockReport = 0;
pendingIncrementalBRperStorage.notifyAll();
}
}
}
}

View File

@ -96,6 +96,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
@ -2908,6 +2909,19 @@ public class DataNode extends ReconfigurableBase
return getReconfigurationTaskStatus();
}
@Override // ClientDatanodeProtocol
public void triggerBlockReport(BlockReportOptions options)
throws IOException {
checkSuperuserPrivilege();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
actor.triggerBlockReport(options);
}
}
}
}
/**
* @param addr rpc address of the namenode
* @return true if the datanode is connected to a NameNode at the

View File

@ -31,10 +31,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,6 +51,7 @@ import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.Command;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@ -396,6 +399,7 @@ public class DFSAdmin extends FsShell {
"\t[-metasave filename]\n" +
"\t[-setStoragePolicy path policyName]\n" +
"\t[-getStoragePolicy path]\n" +
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
"\t[-help [cmd]]\n";
/**
@ -630,6 +634,38 @@ public class DFSAdmin extends FsShell {
throw new IOException("Cannot identify the storage policy for " + argv[1]);
}
public int triggerBlockReport(String[] argv) throws IOException {
List<String> args = new LinkedList<String>();
for (int j = 1; j < argv.length; j++) {
args.add(argv[j]);
}
boolean incremental = StringUtils.popOption("-incremental", args);
String hostPort = StringUtils.popFirstNonOption(args);
if (hostPort == null) {
System.err.println("You must specify a host:port pair.");
return 1;
}
if (!args.isEmpty()) {
System.err.print("Can't understand arguments: " +
Joiner.on(" ").join(args) + "\n");
return 1;
}
ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort);
try {
dnProxy.triggerBlockReport(
new BlockReportOptions.Factory().
setIncremental(incremental).
build());
} catch (IOException e) {
System.err.println("triggerBlockReport error: " + e);
return 1;
}
System.out.println("Triggering " +
(incremental ? "an incremental " : "a full ") +
"block report on " + hostPort + ".");
return 0;
}
/**
* Allow snapshot on a directory.
* Usage: java DFSAdmin -allowSnapshot snapshotDir
@ -983,6 +1019,12 @@ public class DFSAdmin extends FsShell {
String getStoragePolicy = "-getStoragePolicy path\n"
+ "\tGet the storage policy for a file/directory.\n";
String triggerBlockReport =
"-triggerBlockReport [-incremental] <datanode_host:ipc_port>\n"
+ "\tTrigger a block report for the datanode.\n"
+ "\tIf 'incremental' is specified, it will be an incremental\n"
+ "\tblock report; otherwise, it will be a full block report.\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
@ -1080,6 +1122,7 @@ public class DFSAdmin extends FsShell {
System.out.println(getDatanodeInfo);
System.out.println(setStoragePolicy);
System.out.println(getStoragePolicy);
System.out.println(triggerBlockReport);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
@ -1593,6 +1636,9 @@ public class DFSAdmin extends FsShell {
} else if ("-getDatanodeInfo".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-getDatanodeInfo <datanode_host:ipc_port>]");
} else if ("-triggerBlockReport".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
} else {
System.err.println("Usage: hdfs dfsadmin");
System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
@ -1736,6 +1782,11 @@ public class DFSAdmin extends FsShell {
printUsage(cmd);
return exitCode;
}
} else if ("-triggerBlockReport".equals(cmd)) {
if (argv.length < 1) {
printUsage(cmd);
return exitCode;
}
} else if ("-getStoragePolicy".equals(cmd)) {
if (argv.length != 2) {
printUsage(cmd);
@ -1818,6 +1869,8 @@ public class DFSAdmin extends FsShell {
exitCode = setStoragePolicy(argv);
} else if ("-getStoragePolicy".equals(cmd)) {
exitCode = getStoragePolicy(argv);
} else if ("-triggerBlockReport".equals(cmd)) {
exitCode = triggerBlockReport(argv);
} else if ("-help".equals(cmd)) {
if (i < argv.length) {
printHelp(argv[i]);

View File

@ -156,6 +156,13 @@ message StartReconfigurationRequestProto {
message StartReconfigurationResponseProto {
}
message TriggerBlockReportRequestProto {
required bool incremental = 1;
}
message TriggerBlockReportResponseProto {
}
/** Query the running status of reconfiguration process */
message GetReconfigurationStatusRequestProto {
}
@ -222,4 +229,7 @@ service ClientDatanodeProtocolService {
rpc startReconfiguration(StartReconfigurationRequestProto)
returns(StartReconfigurationResponseProto);
rpc triggerBlockReport(TriggerBlockReportRequestProto)
returns(TriggerBlockReportResponseProto);
}

View File

@ -201,6 +201,7 @@ HDFS Commands Guide
[-fetchImage <local directory>]
[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]
[-getDatanodeInfo <datanode_host:ipc_port>]
[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
[-help [cmd]]
+------------------------------------------+
@ -285,7 +286,7 @@ HDFS Commands Guide
| of the resource specified by \<key\> on \<host:ipc_port\>.
| All other args after are sent to the host.
*-----------------+-----------------------------------------------------------+
| -reconfig <datanode|...> <host:ipc_port> <start|status> | Start
| -reconfig \<datanode\|...\> \<host:ipc_port\> \<start\|status\> | Start
| reconfiguration or get the status of an ongoing
| reconfiguration. The second parameter specifies the node
| type. Currently, only reloading DataNode's configuration is
@ -334,6 +335,11 @@ HDFS Commands Guide
| {{{./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo}Rolling Upgrade document}}
| for the detail.
*-----------------+-----------------------------------------------------------+
| -triggerBlockReport [-incremental] \<datanode_host:ipc_port\> | Trigger a
| block report for the given datanode. If 'incremental' is
| specified, it will be | an incremental block report;
| otherwise, it will be a full block report.
*-----------------+-----------------------------------------------------------+
| -help [cmd] | Displays help for the given command or all commands if none
| is specified.
*-----------------+-----------------------------------------------------------+

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.timeout;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test manually requesting that the DataNode send a block report.
*/
public final class TestTriggerBlockReport {
private void testTriggerBlockReport(boolean incremental) throws Exception {
Configuration conf = new HdfsConfiguration();
// Set a really long value for dfs.blockreport.intervalMsec and
// dfs.heartbeat.interval, so that incremental block reports and heartbeats
// won't be sent during this test unless they're triggered
// manually.
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DatanodeProtocolClientSideTranslatorPB spy =
DataNodeTestUtils.spyOnBposToNN(
cluster.getDataNodes().get(0), cluster.getNameNode());
DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);
// We should get 1 incremental block report.
Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
// We should not receive any more incremental or incremental block reports,
// since the interval we configured is so long.
for (int i = 0; i < 3; i++) {
Thread.sleep(10);
Mockito.verify(spy, times(0)).blockReport(
any(DatanodeRegistration.class),
anyString(),
any(StorageBlockReport[].class));
Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
}
// Create a fake block deletion notification on the DataNode.
// This will be sent with the next incremental block report.
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null);
DataNode datanode = cluster.getDataNodes().get(0);
BPServiceActor actor =
datanode.getAllBpOs()[0].getBPServiceActors().get(0);
String storageUuid =
datanode.getFSDataset().getVolumes().get(0).getStorageID();
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
// Manually trigger a block report.
datanode.triggerBlockReport(
new BlockReportOptions.Factory().
setIncremental(incremental).
build()
);
// triggerBlockReport returns before the block report is
// actually sent. Wait for it to be sent here.
if (incremental) {
Mockito.verify(spy, timeout(60000).times(2)).
blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
} else {
Mockito.verify(spy, timeout(60000)).blockReport(
any(DatanodeRegistration.class),
anyString(),
any(StorageBlockReport[].class));
}
cluster.shutdown();
}
@Test
public void testTriggerFullBlockReport() throws Exception {
testTriggerBlockReport(false);
}
@Test
public void testTriggerIncrementalBlockReport() throws Exception {
testTriggerBlockReport(true);
}
}