HDFS-11164: Mover should avoid unnecessary retries if the block is pinned. Contributed by Rakesh R
This commit is contained in:
parent
9947aeb60c
commit
e24a923db5
|
@ -0,0 +1,33 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.protocol.datatransfer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates a failure due to block pinning.
|
||||||
|
*/
|
||||||
|
public class BlockPinningException extends IOException {
|
||||||
|
|
||||||
|
// Required by {@link java.io.Serializable}.
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public BlockPinningException(String errMsg) {
|
||||||
|
super(errMsg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,11 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
@ -107,6 +107,11 @@ public abstract class DataTransferProtoUtil {
|
||||||
public static void checkBlockOpStatus(
|
public static void checkBlockOpStatus(
|
||||||
BlockOpResponseProto response,
|
BlockOpResponseProto response,
|
||||||
String logInfo) throws IOException {
|
String logInfo) throws IOException {
|
||||||
|
checkBlockOpStatus(response, logInfo, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void checkBlockOpStatus(BlockOpResponseProto response,
|
||||||
|
String logInfo, boolean checkBlockPinningErr) throws IOException {
|
||||||
if (response.getStatus() != Status.SUCCESS) {
|
if (response.getStatus() != Status.SUCCESS) {
|
||||||
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
||||||
throw new InvalidBlockTokenException(
|
throw new InvalidBlockTokenException(
|
||||||
|
@ -114,6 +119,14 @@ public abstract class DataTransferProtoUtil {
|
||||||
+ ", status message " + response.getMessage()
|
+ ", status message " + response.getMessage()
|
||||||
+ ", " + logInfo
|
+ ", " + logInfo
|
||||||
);
|
);
|
||||||
|
} else if (checkBlockPinningErr
|
||||||
|
&& response.getStatus() == Status.ERROR_BLOCK_PINNED) {
|
||||||
|
throw new BlockPinningException(
|
||||||
|
"Got error"
|
||||||
|
+ ", status=" + response.getStatus().name()
|
||||||
|
+ ", status message " + response.getMessage()
|
||||||
|
+ ", " + logInfo
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Got error"
|
"Got error"
|
||||||
|
|
|
@ -243,6 +243,7 @@ enum Status {
|
||||||
OOB_RESERVED2 = 10; // Reserved
|
OOB_RESERVED2 = 10; // Reserved
|
||||||
OOB_RESERVED3 = 11; // Reserved
|
OOB_RESERVED3 = 11; // Reserved
|
||||||
IN_PROGRESS = 12;
|
IN_PROGRESS = 12;
|
||||||
|
ERROR_BLOCK_PINNED = 13;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ShortCircuitFdResponse {
|
enum ShortCircuitFdResponse {
|
||||||
|
|
|
@ -36,6 +36,7 @@ import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||||
|
@ -224,6 +226,10 @@ public class Dispatcher {
|
||||||
this.target = target;
|
this.target = target;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DatanodeInfo getSource() {
|
||||||
|
return source.getDatanodeInfo();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
|
final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
|
||||||
|
@ -367,6 +373,15 @@ public class Dispatcher {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to move " + this, e);
|
LOG.warn("Failed to move " + this, e);
|
||||||
target.getDDatanode().setHasFailure();
|
target.getDDatanode().setHasFailure();
|
||||||
|
// Check that the failure is due to block pinning errors.
|
||||||
|
if (e instanceof BlockPinningException) {
|
||||||
|
// Pinned block can't be moved. Add this block into failure list.
|
||||||
|
// Later in the next iteration mover will exclude these blocks from
|
||||||
|
// pending moves.
|
||||||
|
target.getDDatanode().addBlockPinningFailures(this);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Proxy or target may have some issues, delay before using these nodes
|
// Proxy or target may have some issues, delay before using these nodes
|
||||||
// further in order to avoid a potential storm of "threads quota
|
// further in order to avoid a potential storm of "threads quota
|
||||||
// exceeded" warnings when the dispatcher gets out of sync with work
|
// exceeded" warnings when the dispatcher gets out of sync with work
|
||||||
|
@ -419,7 +434,7 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
String logInfo = "reportedBlock move is failed";
|
String logInfo = "reportedBlock move is failed";
|
||||||
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
|
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** reset the object */
|
/** reset the object */
|
||||||
|
@ -600,6 +615,7 @@ public class Dispatcher {
|
||||||
/** blocks being moved but not confirmed yet */
|
/** blocks being moved but not confirmed yet */
|
||||||
private final List<PendingMove> pendings;
|
private final List<PendingMove> pendings;
|
||||||
private volatile boolean hasFailure = false;
|
private volatile boolean hasFailure = false;
|
||||||
|
private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>();
|
||||||
private volatile boolean hasSuccess = false;
|
private volatile boolean hasSuccess = false;
|
||||||
private ExecutorService moveExecutor;
|
private ExecutorService moveExecutor;
|
||||||
|
|
||||||
|
@ -685,6 +701,22 @@ public class Dispatcher {
|
||||||
this.hasFailure = true;
|
this.hasFailure = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addBlockPinningFailures(PendingMove pendingBlock) {
|
||||||
|
synchronized (blockPinningFailures) {
|
||||||
|
long blockId = pendingBlock.reportedBlock.getBlock().getBlockId();
|
||||||
|
Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId);
|
||||||
|
if (pinnedLocations == null) {
|
||||||
|
pinnedLocations = new HashSet<>();
|
||||||
|
blockPinningFailures.put(blockId, pinnedLocations);
|
||||||
|
}
|
||||||
|
pinnedLocations.add(pendingBlock.getSource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() {
|
||||||
|
return blockPinningFailures;
|
||||||
|
}
|
||||||
|
|
||||||
void setHasSuccess() {
|
void setHasSuccess() {
|
||||||
this.hasSuccess = true;
|
this.hasSuccess = true;
|
||||||
}
|
}
|
||||||
|
@ -1154,6 +1186,34 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check any of the block movements are failed due to block pinning errors. If
|
||||||
|
* yes, add the failed blockId and its respective source node location to the
|
||||||
|
* excluded list.
|
||||||
|
*/
|
||||||
|
public static void checkForBlockPinningFailures(
|
||||||
|
Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks,
|
||||||
|
Iterable<? extends StorageGroup> targets) {
|
||||||
|
for (StorageGroup t : targets) {
|
||||||
|
Map<Long, Set<DatanodeInfo>> blockPinningFailureList = t.getDDatanode()
|
||||||
|
.getBlockPinningFailureList();
|
||||||
|
Set<Entry<Long, Set<DatanodeInfo>>> entrySet = blockPinningFailureList
|
||||||
|
.entrySet();
|
||||||
|
for (Entry<Long, Set<DatanodeInfo>> entry : entrySet) {
|
||||||
|
Long blockId = entry.getKey();
|
||||||
|
Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId);
|
||||||
|
if (locs == null) {
|
||||||
|
// blockId doesn't exists in the excluded list.
|
||||||
|
locs = entry.getValue();
|
||||||
|
excludedPinnedBlocks.put(blockId, locs);
|
||||||
|
} else {
|
||||||
|
// blockId already exists in the excluded list, add the pinned node.
|
||||||
|
locs.addAll(entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if some moves are success.
|
* @return true if some moves are success.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
||||||
|
@ -1022,7 +1023,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
String msg = "Not able to copy block " + block.getBlockId() + " " +
|
String msg = "Not able to copy block " + block.getBlockId() + " " +
|
||||||
"to " + peer.getRemoteAddressString() + " because it's pinned ";
|
"to " + peer.getRemoteAddressString() + " because it's pinned ";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
sendResponse(ERROR, msg);
|
sendResponse(Status.ERROR_BLOCK_PINNED, msg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1156,7 +1157,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
String logInfo = "copy block " + block + " from "
|
String logInfo = "copy block " + block + " from "
|
||||||
+ proxySock.getRemoteSocketAddress();
|
+ proxySock.getRemoteSocketAddress();
|
||||||
DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);
|
DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo, true);
|
||||||
|
|
||||||
// get checksum info about the block we're copying
|
// get checksum info about the block we're copying
|
||||||
ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
|
ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
|
||||||
|
@ -1183,6 +1184,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
opStatus = ERROR;
|
opStatus = ERROR;
|
||||||
|
if (ioe instanceof BlockPinningException) {
|
||||||
|
opStatus = Status.ERROR_BLOCK_PINNED;
|
||||||
|
}
|
||||||
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
|
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
|
||||||
LOG.info(errMsg);
|
LOG.info(errMsg);
|
||||||
if (!IoeDuringCopyBlockOperation) {
|
if (!IoeDuringCopyBlockOperation) {
|
||||||
|
|
|
@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
@ -117,10 +116,12 @@ public class Mover {
|
||||||
private final List<Path> targetPaths;
|
private final List<Path> targetPaths;
|
||||||
private final int retryMaxAttempts;
|
private final int retryMaxAttempts;
|
||||||
private final AtomicInteger retryCount;
|
private final AtomicInteger retryCount;
|
||||||
|
private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks;
|
||||||
|
|
||||||
private final BlockStoragePolicy[] blockStoragePolicies;
|
private final BlockStoragePolicy[] blockStoragePolicies;
|
||||||
|
|
||||||
Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) {
|
Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount,
|
||||||
|
Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks) {
|
||||||
final long movedWinWidth = conf.getLong(
|
final long movedWinWidth = conf.getLong(
|
||||||
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
|
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
|
||||||
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
|
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
|
||||||
|
@ -144,6 +145,7 @@ public class Mover {
|
||||||
this.targetPaths = nnc.getTargetPaths();
|
this.targetPaths = nnc.getTargetPaths();
|
||||||
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
|
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
|
||||||
BlockStoragePolicySuite.ID_BIT_LENGTH];
|
BlockStoragePolicySuite.ID_BIT_LENGTH];
|
||||||
|
this.excludedPinnedBlocks = excludedPinnedBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
void init() throws IOException {
|
void init() throws IOException {
|
||||||
|
@ -292,6 +294,8 @@ public class Mover {
|
||||||
// wait for pending move to finish and retry the failed migration
|
// wait for pending move to finish and retry the failed migration
|
||||||
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
|
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
|
||||||
.values());
|
.values());
|
||||||
|
Dispatcher.checkForBlockPinningFailures(excludedPinnedBlocks,
|
||||||
|
storages.targets.values());
|
||||||
boolean hasSuccess = Dispatcher.checkForSuccess(storages.targets
|
boolean hasSuccess = Dispatcher.checkForSuccess(storages.targets
|
||||||
.values());
|
.values());
|
||||||
if (hasFailed && !hasSuccess) {
|
if (hasFailed && !hasSuccess) {
|
||||||
|
@ -461,6 +465,19 @@ public class Mover {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the given block is pinned in the source datanode. A pinned block
|
||||||
|
// can't be moved to a different datanode. So we can skip adding these
|
||||||
|
// blocks to different nodes.
|
||||||
|
long blockId = db.getBlock().getBlockId();
|
||||||
|
if (excludedPinnedBlocks.containsKey(blockId)) {
|
||||||
|
Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId);
|
||||||
|
for (DatanodeInfo dn : locs) {
|
||||||
|
if (source.getDatanodeInfo().equals(dn)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (dispatcher.getCluster().isNodeGroupAware()) {
|
if (dispatcher.getCluster().isNodeGroupAware()) {
|
||||||
if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
|
if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -614,6 +631,8 @@ public class Mover {
|
||||||
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
|
||||||
TimeUnit.SECONDS) * 1000;
|
TimeUnit.SECONDS) * 1000;
|
||||||
AtomicInteger retryCount = new AtomicInteger(0);
|
AtomicInteger retryCount = new AtomicInteger(0);
|
||||||
|
// TODO: Need to limit the size of the pinned blocks to limit memory usage
|
||||||
|
Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>();
|
||||||
LOG.info("namenodes = " + namenodes);
|
LOG.info("namenodes = " + namenodes);
|
||||||
|
|
||||||
checkKeytabAndInit(conf);
|
checkKeytabAndInit(conf);
|
||||||
|
@ -628,7 +647,8 @@ public class Mover {
|
||||||
Iterator<NameNodeConnector> iter = connectors.iterator();
|
Iterator<NameNodeConnector> iter = connectors.iterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
NameNodeConnector nnc = iter.next();
|
NameNodeConnector nnc = iter.next();
|
||||||
final Mover m = new Mover(nnc, conf, retryCount);
|
final Mover m = new Mover(nnc, conf, retryCount,
|
||||||
|
excludedPinnedBlocks);
|
||||||
final ExitStatus r = m.run();
|
final ExitStatus r = m.run();
|
||||||
|
|
||||||
if (r == ExitStatus.SUCCESS) {
|
if (r == ExitStatus.SUCCESS) {
|
||||||
|
|
|
@ -25,10 +25,17 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class for accessing package-private DataNode information during tests.
|
* Utility class for accessing package-private DataNode information during tests.
|
||||||
|
@ -175,4 +182,25 @@ public class DataNodeTestUtils {
|
||||||
dn.getDirectoryScanner().reconcile();
|
dn.getDirectoryScanner().reconcile();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is used to mock the data node block pinning API.
|
||||||
|
*
|
||||||
|
* @param dn datanode
|
||||||
|
* @param pinned true if the block is pinned, false otherwise
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void mockDatanodeBlkPinning(final DataNode dn,
|
||||||
|
final boolean pinned) throws IOException {
|
||||||
|
final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
|
||||||
|
dn.data = Mockito.spy(data);
|
||||||
|
|
||||||
|
doAnswer(new Answer<Object>() {
|
||||||
|
public Object answer(InvocationOnMock invocation) throws IOException {
|
||||||
|
// Bypass the argument to FsDatasetImpl#getPinning to show that
|
||||||
|
// the block is pinned.
|
||||||
|
return pinned;
|
||||||
|
}
|
||||||
|
}).when(dn.data).getPinning(any(ExtendedBlock.class));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,6 +208,67 @@ public class TestBlockReplacement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify that the copying of pinned block to a different destination
|
||||||
|
* datanode will throw IOException with error code Status.ERROR_BLOCK_PINNED.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testBlockReplacementWithPinnedBlocks() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
// create only one datanode in the cluster with DISK and ARCHIVE storage
|
||||||
|
// types.
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
String fileName = "/testBlockReplacementWithPinnedBlocks/file";
|
||||||
|
final Path file = new Path(fileName);
|
||||||
|
DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024);
|
||||||
|
|
||||||
|
LocatedBlock lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
|
||||||
|
DatanodeInfo[] oldNodes = lb.getLocations();
|
||||||
|
assertEquals("Wrong block locations", oldNodes.length, 1);
|
||||||
|
DatanodeInfo source = oldNodes[0];
|
||||||
|
ExtendedBlock b = lb.getBlock();
|
||||||
|
|
||||||
|
DatanodeInfo[] datanodes = dfs.getDataNodeStats();
|
||||||
|
DatanodeInfo destin = null;
|
||||||
|
for (DatanodeInfo datanodeInfo : datanodes) {
|
||||||
|
// choose different destination node
|
||||||
|
if (!oldNodes[0].equals(datanodeInfo)) {
|
||||||
|
destin = datanodeInfo;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNotNull("Failed to choose destination datanode!", destin);
|
||||||
|
|
||||||
|
assertFalse("Source and destin datanode should be different",
|
||||||
|
source.equals(destin));
|
||||||
|
|
||||||
|
// Mock FsDatasetSpi#getPinning to show that the block is pinned.
|
||||||
|
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
LOG.info("Simulate block pinning in datanode " + dn);
|
||||||
|
DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Block movement to a different datanode should fail as the block is
|
||||||
|
// pinned.
|
||||||
|
assertTrue("Status code mismatches!", replaceBlock(b, source, source,
|
||||||
|
destin, StorageType.ARCHIVE, Status.ERROR_BLOCK_PINNED));
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockMoveAcrossStorageInSameNode() throws Exception {
|
public void testBlockMoveAcrossStorageInSameNode() throws Exception {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
@ -236,7 +297,7 @@ public class TestBlockReplacement {
|
||||||
// move block to ARCHIVE by using same DataNodeInfo for source, proxy and
|
// move block to ARCHIVE by using same DataNodeInfo for source, proxy and
|
||||||
// destination so that movement happens within datanode
|
// destination so that movement happens within datanode
|
||||||
assertTrue(replaceBlock(block, source, source, source,
|
assertTrue(replaceBlock(block, source, source, source,
|
||||||
StorageType.ARCHIVE));
|
StorageType.ARCHIVE, Status.SUCCESS));
|
||||||
|
|
||||||
// wait till namenode notified
|
// wait till namenode notified
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
|
@ -311,7 +372,7 @@ public class TestBlockReplacement {
|
||||||
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
|
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
|
||||||
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
|
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
|
||||||
return replaceBlock(block, source, sourceProxy, destination,
|
return replaceBlock(block, source, sourceProxy, destination,
|
||||||
StorageType.DEFAULT);
|
StorageType.DEFAULT, Status.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -322,7 +383,8 @@ public class TestBlockReplacement {
|
||||||
DatanodeInfo source,
|
DatanodeInfo source,
|
||||||
DatanodeInfo sourceProxy,
|
DatanodeInfo sourceProxy,
|
||||||
DatanodeInfo destination,
|
DatanodeInfo destination,
|
||||||
StorageType targetStorageType) throws IOException, SocketException {
|
StorageType targetStorageType,
|
||||||
|
Status opStatus) throws IOException, SocketException {
|
||||||
Socket sock = new Socket();
|
Socket sock = new Socket();
|
||||||
try {
|
try {
|
||||||
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
|
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
|
||||||
|
@ -342,7 +404,7 @@ public class TestBlockReplacement {
|
||||||
while (proto.getStatus() == Status.IN_PROGRESS) {
|
while (proto.getStatus() == Status.IN_PROGRESS) {
|
||||||
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
|
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
|
||||||
}
|
}
|
||||||
return proto.getStatus() == Status.SUCCESS;
|
return proto.getStatus() == opStatus;
|
||||||
} finally {
|
} finally {
|
||||||
sock.close();
|
sock.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,11 +37,13 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -64,6 +66,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -72,6 +75,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
|
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
|
@ -121,7 +126,7 @@ public class TestMover {
|
||||||
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
|
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
|
||||||
nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
|
nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
|
||||||
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
|
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
|
||||||
return new Mover(nncs.get(0), conf, new AtomicInteger(0));
|
return new Mover(nncs.get(0), conf, new AtomicInteger(0), new HashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -705,4 +710,160 @@ public class TestMover {
|
||||||
UserGroupInformation.setConfiguration(new Configuration());
|
UserGroupInformation.setConfiguration(new Configuration());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify that mover can't move pinned blocks.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testMoverWithPinnedBlocks() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
|
||||||
|
// Sets bigger retry max attempts value so that test case will timed out if
|
||||||
|
// block pinning errors are not handled properly during block movement.
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 10000);
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(3)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final String file = "/testMoverWithPinnedBlocks/file";
|
||||||
|
Path dir = new Path("/testMoverWithPinnedBlocks");
|
||||||
|
dfs.mkdirs(dir);
|
||||||
|
|
||||||
|
// write to DISK
|
||||||
|
dfs.setStoragePolicy(dir, "HOT");
|
||||||
|
final FSDataOutputStream out = dfs.create(new Path(file));
|
||||||
|
byte[] fileData = StripedFileTestUtil
|
||||||
|
.generateBytes(DEFAULT_BLOCK_SIZE * 3);
|
||||||
|
out.write(fileData);
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// verify before movement
|
||||||
|
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||||
|
StorageType[] storageTypes = lb.getStorageTypes();
|
||||||
|
for (StorageType storageType : storageTypes) {
|
||||||
|
Assert.assertTrue(StorageType.DISK == storageType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adding one SSD based data node to the cluster.
|
||||||
|
StorageType[][] newtypes = new StorageType[][] {{StorageType.SSD}};
|
||||||
|
startAdditionalDNs(conf, 1, newtypes, cluster);
|
||||||
|
|
||||||
|
// Mock FsDatasetSpi#getPinning to show that the block is pinned.
|
||||||
|
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
LOG.info("Simulate block pinning in datanode {}", dn);
|
||||||
|
DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// move file blocks to ONE_SSD policy
|
||||||
|
dfs.setStoragePolicy(dir, "ONE_SSD");
|
||||||
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[] {"-p", dir.toString()});
|
||||||
|
|
||||||
|
int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
|
||||||
|
Assert.assertEquals("Movement should fail", exitcode, rc);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify that mover should work well with pinned blocks as well as
|
||||||
|
* failed blocks. Mover should continue retrying the failed blocks only.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testMoverFailedRetryWithPinnedBlocks() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(2)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE}}).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final String parenDir = "/parent";
|
||||||
|
dfs.mkdirs(new Path(parenDir));
|
||||||
|
final String file1 = "/parent/testMoverFailedRetryWithPinnedBlocks1";
|
||||||
|
// write to DISK
|
||||||
|
final FSDataOutputStream out = dfs.create(new Path(file1), (short) 2);
|
||||||
|
byte[] fileData = StripedFileTestUtil
|
||||||
|
.generateBytes(DEFAULT_BLOCK_SIZE * 2);
|
||||||
|
out.write(fileData);
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// Adding pinned blocks.
|
||||||
|
createFileWithFavoredDatanodes(conf, cluster, dfs);
|
||||||
|
|
||||||
|
// Delete block file so, block move will fail with FileNotFoundException
|
||||||
|
LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
|
||||||
|
Assert.assertEquals("Wrong block count", 2,
|
||||||
|
locatedBlocks.locatedBlockCount());
|
||||||
|
LocatedBlock lb = locatedBlocks.get(0);
|
||||||
|
cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
|
||||||
|
|
||||||
|
// move to ARCHIVE
|
||||||
|
dfs.setStoragePolicy(new Path(parenDir), "COLD");
|
||||||
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[] {"-p", parenDir.toString()});
|
||||||
|
Assert.assertEquals("Movement should fail after some retry",
|
||||||
|
ExitStatus.NO_MOVE_PROGRESS.getExitCode(), rc);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createFileWithFavoredDatanodes(final Configuration conf,
|
||||||
|
final MiniDFSCluster cluster, final DistributedFileSystem dfs)
|
||||||
|
throws IOException {
|
||||||
|
// Adding two DISK based data node to the cluster.
|
||||||
|
// Also, ensure that blocks are pinned in these new data nodes.
|
||||||
|
StorageType[][] newtypes =
|
||||||
|
new StorageType[][] {{StorageType.DISK}, {StorageType.DISK}};
|
||||||
|
startAdditionalDNs(conf, 2, newtypes, cluster);
|
||||||
|
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
|
||||||
|
InetSocketAddress[] favoredNodes = new InetSocketAddress[2];
|
||||||
|
int j = 0;
|
||||||
|
for (int i = dataNodes.size() - 1; i >= 2; i--) {
|
||||||
|
favoredNodes[j++] = dataNodes.get(i).getXferAddress();
|
||||||
|
}
|
||||||
|
final String file = "/parent/testMoverFailedRetryWithPinnedBlocks2";
|
||||||
|
final FSDataOutputStream out = dfs.create(new Path(file),
|
||||||
|
FsPermission.getDefault(), true, DEFAULT_BLOCK_SIZE, (short) 2,
|
||||||
|
DEFAULT_BLOCK_SIZE, null, favoredNodes);
|
||||||
|
byte[] fileData = StripedFileTestUtil.generateBytes(DEFAULT_BLOCK_SIZE * 2);
|
||||||
|
out.write(fileData);
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// Mock FsDatasetSpi#getPinning to show that the block is pinned.
|
||||||
|
LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file, 0);
|
||||||
|
Assert.assertEquals("Wrong block count", 2,
|
||||||
|
locatedBlocks.locatedBlockCount());
|
||||||
|
LocatedBlock lb = locatedBlocks.get(0);
|
||||||
|
DatanodeInfo datanodeInfo = lb.getLocations()[0];
|
||||||
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
|
if (dn.getDatanodeId().getDatanodeUuid()
|
||||||
|
.equals(datanodeInfo.getDatanodeUuid())) {
|
||||||
|
LOG.info("Simulate block pinning in datanode {}", datanodeInfo);
|
||||||
|
DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startAdditionalDNs(final Configuration conf,
|
||||||
|
int newNodesRequired, StorageType[][] newTypes,
|
||||||
|
final MiniDFSCluster cluster) throws IOException {
|
||||||
|
|
||||||
|
cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
|
||||||
|
null, null, null, false, false, false, null);
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue