HDFS-7278. Add a command that allows sysadmins to manually trigger full block reports from a DN (cmccabe)
(cherry picked from commitbaf794dc40
) (cherry picked from commit5f3d967aae
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSCommands.apt.vm (cherry picked from commit a776ef5ad2876b9acf6cf89824c306783f7759f1)
This commit is contained in:
parent
bbed4e67a9
commit
51d5fc62c2
|
@ -6,6 +6,9 @@ Release 2.6.1 - UNRELEASED
|
|||
|
||||
NEW FEATURES
|
||||
|
||||
HDFS-7278. Add a command that allows sysadmins to manually trigger full
|
||||
block reports from a DN (cmccabe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-7035. Make adding a new data directory to the DataNode an atomic
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.client;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Options that can be specified when manually triggering a block report.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public final class BlockReportOptions {
|
||||
private final boolean incremental;
|
||||
|
||||
private BlockReportOptions(boolean incremental) {
|
||||
this.incremental = incremental;
|
||||
}
|
||||
|
||||
public boolean isIncremental() {
|
||||
return incremental;
|
||||
}
|
||||
|
||||
public static class Factory {
|
||||
private boolean incremental = false;
|
||||
|
||||
public Factory() {
|
||||
}
|
||||
|
||||
public Factory setIncremental(boolean incremental) {
|
||||
this.incremental = incremental;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BlockReportOptions build() {
|
||||
return new BlockReportOptions(incremental);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BlockReportOptions{incremental=" + incremental + "}";
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
|
||||
|
@ -158,4 +159,10 @@ public interface ClientDatanodeProtocol {
|
|||
* @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
|
||||
*/
|
||||
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
|
||||
|
||||
/**
|
||||
* Trigger a new block report.
|
||||
*/
|
||||
void triggerBlockReport(BlockReportOptions options)
|
||||
throws IOException;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.base.Optional;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
|
||||
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
|
@ -49,6 +50,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Shutdo
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -74,6 +77,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
|
|||
ShutdownDatanodeResponseProto.newBuilder().build();
|
||||
private final static StartReconfigurationResponseProto START_RECONFIG_RESP =
|
||||
StartReconfigurationResponseProto.newBuilder().build();
|
||||
private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP =
|
||||
TriggerBlockReportResponseProto.newBuilder().build();
|
||||
|
||||
private final ClientDatanodeProtocol impl;
|
||||
|
||||
|
@ -237,4 +242,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
|
|||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TriggerBlockReportResponseProto triggerBlockReport(
|
||||
RpcController unused, TriggerBlockReportRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.triggerBlockReport(new BlockReportOptions.Factory().
|
||||
setIncremental(request.getIncremental()).build());
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return TRIGGER_BLOCK_REPORT_RESP;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
|
||||
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
|
@ -57,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRec
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
|
@ -333,4 +336,17 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
|||
}
|
||||
return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerBlockReport(BlockReportOptions options)
|
||||
throws IOException {
|
||||
try {
|
||||
rpcProxy.triggerBlockReport(NULL_CONTROLLER,
|
||||
TriggerBlockReportRequestProto.newBuilder().
|
||||
setIncremental(options.isIncremental()).
|
||||
build());
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import com.google.common.base.Joiner;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
|
@ -1023,4 +1024,20 @@ class BPServiceActor implements Runnable {
|
|||
return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
|
||||
}
|
||||
}
|
||||
|
||||
void triggerBlockReport(BlockReportOptions options) throws IOException {
|
||||
if (options.isIncremental()) {
|
||||
LOG.info(bpos.toString() + ": scheduling an incremental block report.");
|
||||
synchronized(pendingIncrementalBRperStorage) {
|
||||
sendImmediateIBR = true;
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
}
|
||||
} else {
|
||||
LOG.info(bpos.toString() + ": scheduling a full block report.");
|
||||
synchronized(pendingIncrementalBRperStorage) {
|
||||
lastBlockReport = 0;
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,6 +98,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
|
||||
|
@ -2967,6 +2968,19 @@ public class DataNode extends ReconfigurableBase
|
|||
return getReconfigurationTaskStatus();
|
||||
}
|
||||
|
||||
@Override // ClientDatanodeProtocol
|
||||
public void triggerBlockReport(BlockReportOptions options)
|
||||
throws IOException {
|
||||
checkSuperuserPrivilege();
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
if (bpos != null) {
|
||||
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||
actor.triggerBlockReport(options);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param addr rpc address of the namenode
|
||||
* @return true if the datanode is connected to a NameNode at the
|
||||
|
|
|
@ -31,10 +31,12 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Optional;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -49,6 +51,7 @@ import org.apache.hadoop.fs.FsStatus;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.shell.Command;
|
||||
import org.apache.hadoop.fs.shell.CommandFormat;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
|
@ -397,6 +400,7 @@ public class DFSAdmin extends FsShell {
|
|||
"\t[-metasave filename]\n" +
|
||||
"\t[-setStoragePolicy path policyName]\n" +
|
||||
"\t[-getStoragePolicy path]\n" +
|
||||
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
|
||||
"\t[-help [cmd]]\n";
|
||||
|
||||
/**
|
||||
|
@ -631,6 +635,38 @@ public class DFSAdmin extends FsShell {
|
|||
throw new IOException("Cannot identify the storage policy for " + argv[1]);
|
||||
}
|
||||
|
||||
public int triggerBlockReport(String[] argv) throws IOException {
|
||||
List<String> args = new LinkedList<String>();
|
||||
for (int j = 1; j < argv.length; j++) {
|
||||
args.add(argv[j]);
|
||||
}
|
||||
boolean incremental = StringUtils.popOption("-incremental", args);
|
||||
String hostPort = StringUtils.popFirstNonOption(args);
|
||||
if (hostPort == null) {
|
||||
System.err.println("You must specify a host:port pair.");
|
||||
return 1;
|
||||
}
|
||||
if (!args.isEmpty()) {
|
||||
System.err.print("Can't understand arguments: " +
|
||||
Joiner.on(" ").join(args) + "\n");
|
||||
return 1;
|
||||
}
|
||||
ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort);
|
||||
try {
|
||||
dnProxy.triggerBlockReport(
|
||||
new BlockReportOptions.Factory().
|
||||
setIncremental(incremental).
|
||||
build());
|
||||
} catch (IOException e) {
|
||||
System.err.println("triggerBlockReport error: " + e);
|
||||
return 1;
|
||||
}
|
||||
System.out.println("Triggering " +
|
||||
(incremental ? "an incremental " : "a full ") +
|
||||
"block report on " + hostPort + ".");
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allow snapshot on a directory.
|
||||
* Usage: java DFSAdmin -allowSnapshot snapshotDir
|
||||
|
@ -984,6 +1020,12 @@ public class DFSAdmin extends FsShell {
|
|||
String getStoragePolicy = "-getStoragePolicy path\n"
|
||||
+ "\tGet the storage policy for a file/directory.\n";
|
||||
|
||||
String triggerBlockReport =
|
||||
"-triggerBlockReport [-incremental] <datanode_host:ipc_port>\n"
|
||||
+ "\tTrigger a block report for the datanode.\n"
|
||||
+ "\tIf 'incremental' is specified, it will be an incremental\n"
|
||||
+ "\tblock report; otherwise, it will be a full block report.\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||
"\t\tis specified.\n";
|
||||
|
||||
|
@ -1081,6 +1123,7 @@ public class DFSAdmin extends FsShell {
|
|||
System.out.println(getDatanodeInfo);
|
||||
System.out.println(setStoragePolicy);
|
||||
System.out.println(getStoragePolicy);
|
||||
System.out.println(triggerBlockReport);
|
||||
System.out.println(help);
|
||||
System.out.println();
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
|
@ -1594,6 +1637,9 @@ public class DFSAdmin extends FsShell {
|
|||
} else if ("-getDatanodeInfo".equals(cmd)) {
|
||||
System.err.println("Usage: hdfs dfsadmin"
|
||||
+ " [-getDatanodeInfo <datanode_host:ipc_port>]");
|
||||
} else if ("-triggerBlockReport".equals(cmd)) {
|
||||
System.err.println("Usage: java DFSAdmin"
|
||||
+ " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
|
||||
} else {
|
||||
System.err.println("Usage: hdfs dfsadmin");
|
||||
System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
|
||||
|
@ -1737,6 +1783,11 @@ public class DFSAdmin extends FsShell {
|
|||
printUsage(cmd);
|
||||
return exitCode;
|
||||
}
|
||||
} else if ("-triggerBlockReport".equals(cmd)) {
|
||||
if (argv.length < 1) {
|
||||
printUsage(cmd);
|
||||
return exitCode;
|
||||
}
|
||||
} else if ("-getStoragePolicy".equals(cmd)) {
|
||||
if (argv.length != 2) {
|
||||
printUsage(cmd);
|
||||
|
@ -1819,6 +1870,8 @@ public class DFSAdmin extends FsShell {
|
|||
exitCode = setStoragePolicy(argv);
|
||||
} else if ("-getStoragePolicy".equals(cmd)) {
|
||||
exitCode = getStoragePolicy(argv);
|
||||
} else if ("-triggerBlockReport".equals(cmd)) {
|
||||
exitCode = triggerBlockReport(argv);
|
||||
} else if ("-help".equals(cmd)) {
|
||||
if (i < argv.length) {
|
||||
printHelp(argv[i]);
|
||||
|
|
|
@ -156,6 +156,13 @@ message StartReconfigurationRequestProto {
|
|||
message StartReconfigurationResponseProto {
|
||||
}
|
||||
|
||||
message TriggerBlockReportRequestProto {
|
||||
required bool incremental = 1;
|
||||
}
|
||||
|
||||
message TriggerBlockReportResponseProto {
|
||||
}
|
||||
|
||||
/** Query the running status of reconfiguration process */
|
||||
message GetReconfigurationStatusRequestProto {
|
||||
}
|
||||
|
@ -222,4 +229,7 @@ service ClientDatanodeProtocolService {
|
|||
|
||||
rpc startReconfiguration(StartReconfigurationRequestProto)
|
||||
returns(StartReconfigurationResponseProto);
|
||||
|
||||
rpc triggerBlockReport(TriggerBlockReportRequestProto)
|
||||
returns(TriggerBlockReportResponseProto);
|
||||
}
|
||||
|
|
|
@ -200,6 +200,7 @@ HDFS Commands Guide
|
|||
[-fetchImage <local directory>]
|
||||
[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]
|
||||
[-getDatanodeInfo <datanode_host:ipc_port>]
|
||||
[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
|
||||
[-help [cmd]]
|
||||
+------------------------------------------+
|
||||
|
||||
|
@ -327,6 +328,11 @@ HDFS Commands Guide
|
|||
| {{{./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo}Rolling Upgrade document}}
|
||||
| for the detail.
|
||||
*-----------------+-----------------------------------------------------------+
|
||||
| -triggerBlockReport [-incremental] \<datanode_host:ipc_port\> | Trigger a
|
||||
| block report for the given datanode. If 'incremental' is
|
||||
| specified, it will be | an incremental block report;
|
||||
| otherwise, it will be a full block report.
|
||||
*-----------------+-----------------------------------------------------------+
|
||||
| -help [cmd] | Displays help for the given command or all commands if none
|
||||
| is specified.
|
||||
*-----------------+-----------------------------------------------------------+
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Test manually requesting that the DataNode send a block report.
|
||||
*/
|
||||
public final class TestTriggerBlockReport {
|
||||
private void testTriggerBlockReport(boolean incremental) throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
||||
// Set a really long value for dfs.blockreport.intervalMsec and
|
||||
// dfs.heartbeat.interval, so that incremental block reports and heartbeats
|
||||
// won't be sent during this test unless they're triggered
|
||||
// manually.
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);
|
||||
|
||||
final MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DatanodeProtocolClientSideTranslatorPB spy =
|
||||
DataNodeTestUtils.spyOnBposToNN(
|
||||
cluster.getDataNodes().get(0), cluster.getNameNode());
|
||||
DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);
|
||||
|
||||
// We should get 1 incremental block report.
|
||||
Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
|
||||
// We should not receive any more incremental or incremental block reports,
|
||||
// since the interval we configured is so long.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Thread.sleep(10);
|
||||
Mockito.verify(spy, times(0)).blockReport(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageBlockReport[].class),
|
||||
Mockito.<BlockReportContext>anyObject());
|
||||
Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
}
|
||||
|
||||
// Create a fake block deletion notification on the DataNode.
|
||||
// This will be sent with the next incremental block report.
|
||||
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
|
||||
new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null);
|
||||
DataNode datanode = cluster.getDataNodes().get(0);
|
||||
BPServiceActor actor =
|
||||
datanode.getAllBpOs()[0].getBPServiceActors().get(0);
|
||||
String storageUuid =
|
||||
datanode.getFSDataset().getVolumes().get(0).getStorageID();
|
||||
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
|
||||
|
||||
// Manually trigger a block report.
|
||||
datanode.triggerBlockReport(
|
||||
new BlockReportOptions.Factory().
|
||||
setIncremental(incremental).
|
||||
build()
|
||||
);
|
||||
|
||||
// triggerBlockReport returns before the block report is
|
||||
// actually sent. Wait for it to be sent here.
|
||||
if (incremental) {
|
||||
Mockito.verify(spy, timeout(60000).times(2)).
|
||||
blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
} else {
|
||||
Mockito.verify(spy, timeout(60000)).blockReport(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageBlockReport[].class),
|
||||
Mockito.<BlockReportContext>anyObject());
|
||||
}
|
||||
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerFullBlockReport() throws Exception {
|
||||
testTriggerBlockReport(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTriggerIncrementalBlockReport() throws Exception {
|
||||
testTriggerBlockReport(true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue