diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ad8bebf8c0d..5ed6cbfec1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -6,6 +6,9 @@ Release 2.6.1 - UNRELEASED NEW FEATURES + HDFS-7278. Add a command that allows sysadmins to manually trigger full + block reports from a DN (cmccabe) + IMPROVEMENTS HDFS-7035. Make adding a new data directory to the DataNode an atomic diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java new file mode 100644 index 00000000000..07f483692c8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Options that can be specified when manually triggering a block report. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class BlockReportOptions { + private final boolean incremental; + + private BlockReportOptions(boolean incremental) { + this.incremental = incremental; + } + + public boolean isIncremental() { + return incremental; + } + + public static class Factory { + private boolean incremental = false; + + public Factory() { + } + + public Factory setIncremental(boolean incremental) { + this.incremental = incremental; + return this; + } + + public BlockReportOptions build() { + return new BlockReportOptions(incremental); + } + } + + @Override + public String toString() { + return "BlockReportOptions{incremental=" + incremental + "}"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 9cd5ccd8af4..1dcc196d65b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; @@ -158,4 +159,10 @@ public interface ClientDatanodeProtocol { * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. */ ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; + + /** + * Trigger a new block report. + */ + void triggerBlockReport(BlockReportOptions options) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index ed7f0ae160c..5c2c4a77690 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -26,6 +26,7 @@ import com.google.common.base.Optional; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; @@ -49,6 +50,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Shutdo import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; @@ -74,6 +77,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements ShutdownDatanodeResponseProto.newBuilder().build(); private final static StartReconfigurationResponseProto START_RECONFIG_RESP = StartReconfigurationResponseProto.newBuilder().build(); + private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP = + TriggerBlockReportResponseProto.newBuilder().build(); private final ClientDatanodeProtocol impl; @@ -237,4 +242,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements } return builder.build(); } + + @Override + public TriggerBlockReportResponseProto triggerBlockReport( + RpcController unused, TriggerBlockReportRequestProto request) + throws ServiceException { + try { + impl.triggerBlockReport(new BlockReportOptions.Factory(). + setIncremental(request.getIncremental()).build()); + } catch (IOException e) { + throw new ServiceException(e); + } + return TRIGGER_BLOCK_REPORT_RESP; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 00b6ad745ff..f1a1b24a959 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -57,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRec import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -333,4 +336,17 @@ public class ClientDatanodeProtocolTranslatorPB implements } return new ReconfigurationTaskStatus(startTime, endTime, statusMap); } + + @Override + public void triggerBlockReport(BlockReportOptions options) + throws IOException { + try { + rpcProxy.triggerBlockReport(NULL_CONTROLLER, + TriggerBlockReportRequestProto.newBuilder(). + setIncremental(options.isIncremental()). + build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 3d116ea4ca1..e3a3cae3d23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -32,6 +32,7 @@ import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -1023,4 +1024,20 @@ class BPServiceActor implements Runnable { return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null); } } + + void triggerBlockReport(BlockReportOptions options) throws IOException { + if (options.isIncremental()) { + LOG.info(bpos.toString() + ": scheduling an incremental block report."); + synchronized(pendingIncrementalBRperStorage) { + sendImmediateIBR = true; + pendingIncrementalBRperStorage.notifyAll(); + } + } else { + LOG.info(bpos.toString() + ": scheduling a full block report."); + synchronized(pendingIncrementalBRperStorage) { + lastBlockReport = 0; + pendingIncrementalBRperStorage.notifyAll(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 5acdede9235..c46cd8b43be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -98,6 +98,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; @@ -2967,6 +2968,19 @@ public class DataNode extends ReconfigurableBase return getReconfigurationTaskStatus(); } + @Override // ClientDatanodeProtocol + public void triggerBlockReport(BlockReportOptions options) + throws IOException { + checkSuperuserPrivilege(); + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { + if (bpos != null) { + for (BPServiceActor actor : bpos.getBPServiceActors()) { + actor.triggerBlockReport(options); + } + } + } + } + /** * @param addr rpc address of the namenode * @return true if the datanode is connected to a NameNode at the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 0d040f3e14a..f0d5add9dbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -31,10 +31,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeSet; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +51,7 @@ import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.shell.Command; import org.apache.hadoop.fs.shell.CommandFormat; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -397,6 +400,7 @@ public class DFSAdmin extends FsShell { "\t[-metasave filename]\n" + "\t[-setStoragePolicy path policyName]\n" + "\t[-getStoragePolicy path]\n" + + "\t[-triggerBlockReport [-incremental] ]\n" + "\t[-help [cmd]]\n"; /** @@ -631,6 +635,38 @@ public class DFSAdmin extends FsShell { throw new IOException("Cannot identify the storage policy for " + argv[1]); } + public int triggerBlockReport(String[] argv) throws IOException { + List args = new LinkedList(); + for (int j = 1; j < argv.length; j++) { + args.add(argv[j]); + } + boolean incremental = StringUtils.popOption("-incremental", args); + String hostPort = StringUtils.popFirstNonOption(args); + if (hostPort == null) { + System.err.println("You must specify a host:port pair."); + return 1; + } + if (!args.isEmpty()) { + System.err.print("Can't understand arguments: " + + Joiner.on(" ").join(args) + "\n"); + return 1; + } + ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort); + try { + dnProxy.triggerBlockReport( + new BlockReportOptions.Factory(). + setIncremental(incremental). + build()); + } catch (IOException e) { + System.err.println("triggerBlockReport error: " + e); + return 1; + } + System.out.println("Triggering " + + (incremental ? "an incremental " : "a full ") + + "block report on " + hostPort + "."); + return 0; + } + /** * Allow snapshot on a directory. * Usage: java DFSAdmin -allowSnapshot snapshotDir @@ -984,6 +1020,12 @@ public class DFSAdmin extends FsShell { String getStoragePolicy = "-getStoragePolicy path\n" + "\tGet the storage policy for a file/directory.\n"; + String triggerBlockReport = + "-triggerBlockReport [-incremental] \n" + + "\tTrigger a block report for the datanode.\n" + + "\tIf 'incremental' is specified, it will be an incremental\n" + + "\tblock report; otherwise, it will be a full block report.\n"; + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1081,6 +1123,7 @@ public class DFSAdmin extends FsShell { System.out.println(getDatanodeInfo); System.out.println(setStoragePolicy); System.out.println(getStoragePolicy); + System.out.println(triggerBlockReport); System.out.println(help); System.out.println(); ToolRunner.printGenericCommandUsage(System.out); @@ -1594,6 +1637,9 @@ public class DFSAdmin extends FsShell { } else if ("-getDatanodeInfo".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-getDatanodeInfo ]"); + } else if ("-triggerBlockReport".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-triggerBlockReport [-incremental] ]"); } else { System.err.println("Usage: hdfs dfsadmin"); System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); @@ -1737,6 +1783,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-triggerBlockReport".equals(cmd)) { + if (argv.length < 1) { + printUsage(cmd); + return exitCode; + } } else if ("-getStoragePolicy".equals(cmd)) { if (argv.length != 2) { printUsage(cmd); @@ -1819,6 +1870,8 @@ public class DFSAdmin extends FsShell { exitCode = setStoragePolicy(argv); } else if ("-getStoragePolicy".equals(cmd)) { exitCode = getStoragePolicy(argv); + } else if ("-triggerBlockReport".equals(cmd)) { + exitCode = triggerBlockReport(argv); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto index 61f787bec92..48f6dd13d62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto @@ -156,6 +156,13 @@ message StartReconfigurationRequestProto { message StartReconfigurationResponseProto { } +message TriggerBlockReportRequestProto { + required bool incremental = 1; +} + +message TriggerBlockReportResponseProto { +} + /** Query the running status of reconfiguration process */ message GetReconfigurationStatusRequestProto { } @@ -222,4 +229,7 @@ service ClientDatanodeProtocolService { rpc startReconfiguration(StartReconfigurationRequestProto) returns(StartReconfigurationResponseProto); + + rpc triggerBlockReport(TriggerBlockReportRequestProto) + returns(TriggerBlockReportResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSCommands.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSCommands.apt.vm index 70851105c52..bc872ba1ed4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSCommands.apt.vm +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSCommands.apt.vm @@ -200,6 +200,7 @@ HDFS Commands Guide [-fetchImage ] [-shutdownDatanode [upgrade]] [-getDatanodeInfo ] + [-triggerBlockReport [-incremental] ] [-help [cmd]] +------------------------------------------+ @@ -327,6 +328,11 @@ HDFS Commands Guide | {{{./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo}Rolling Upgrade document}} | for the detail. *-----------------+-----------------------------------------------------------+ +| -triggerBlockReport [-incremental] \ | Trigger a + | block report for the given datanode. If 'incremental' is + | specified, it will be | an incremental block report; + | otherwise, it will be a full block report. +*-----------------+-----------------------------------------------------------+ | -help [cmd] | Displays help for the given command or all commands if none | is specified. *-----------------+-----------------------------------------------------------+ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java new file mode 100644 index 00000000000..3195d7d8e87 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.timeout; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test manually requesting that the DataNode send a block report. + */ +public final class TestTriggerBlockReport { + private void testTriggerBlockReport(boolean incremental) throws Exception { + Configuration conf = new HdfsConfiguration(); + + // Set a really long value for dfs.blockreport.intervalMsec and + // dfs.heartbeat.interval, so that incremental block reports and heartbeats + // won't be sent during this test unless they're triggered + // manually. + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L); + + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + DatanodeProtocolClientSideTranslatorPB spy = + DataNodeTestUtils.spyOnBposToNN( + cluster.getDataNodes().get(0), cluster.getNameNode()); + DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L); + + // We should get 1 incremental block report. + Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + // We should not receive any more incremental or incremental block reports, + // since the interval we configured is so long. + for (int i = 0; i < 3; i++) { + Thread.sleep(10); + Mockito.verify(spy, times(0)).blockReport( + any(DatanodeRegistration.class), + anyString(), + any(StorageBlockReport[].class), + Mockito.anyObject()); + Mockito.verify(spy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + } + + // Create a fake block deletion notification on the DataNode. + // This will be sent with the next incremental block report. + ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( + new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null); + DataNode datanode = cluster.getDataNodes().get(0); + BPServiceActor actor = + datanode.getAllBpOs()[0].getBPServiceActors().get(0); + String storageUuid = + datanode.getFSDataset().getVolumes().get(0).getStorageID(); + actor.notifyNamenodeDeletedBlock(rdbi, storageUuid); + + // Manually trigger a block report. + datanode.triggerBlockReport( + new BlockReportOptions.Factory(). + setIncremental(incremental). + build() + ); + + // triggerBlockReport returns before the block report is + // actually sent. Wait for it to be sent here. + if (incremental) { + Mockito.verify(spy, timeout(60000).times(2)). + blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + } else { + Mockito.verify(spy, timeout(60000)).blockReport( + any(DatanodeRegistration.class), + anyString(), + any(StorageBlockReport[].class), + Mockito.anyObject()); + } + + cluster.shutdown(); + } + + @Test + public void testTriggerFullBlockReport() throws Exception { + testTriggerBlockReport(false); + } + + @Test + public void testTriggerIncrementalBlockReport() throws Exception { + testTriggerBlockReport(true); + } +}