HDFS-7604. Track and display failed DataNode storage locations in NameNode. Contributed by Chris Nauroth.

(cherry picked from commit 9729b244de)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
This commit is contained in:
cnauroth 2015-02-16 14:43:02 -08:00
parent 1d91daaae9
commit 441dfa4867
39 changed files with 1022 additions and 106 deletions

View File

@ -338,6 +338,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple
threads (cmccabe)
HDFS-7604. Track and display failed DataNode storage locations in NameNode.
(cnauroth)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
@ -121,8 +122,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes)
throws IOException {
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@ -134,6 +135,10 @@ public class DatanodeProtocolClientSideTranslatorPB implements
if (cacheUsed != 0) {
builder.setCacheUsed(cacheUsed);
}
if (volumeFailureSummary != null) {
builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
volumeFailureSummary));
}
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -104,10 +105,14 @@ public class DatanodeProtocolServerSideTranslatorPB implements
try {
final StorageReport[] report = PBHelper.convertStorageReports(
request.getReportsList());
VolumeFailureSummary volumeFailureSummary =
request.hasVolumeFailureSummary() ? PBHelper.convertVolumeFailureSummary(
request.getVolumeFailureSummary()) : null;
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes());
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -122,6 +122,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCom
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHeartbeatProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@ -216,6 +217,7 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
@ -1902,6 +1904,29 @@ public class PBHelper {
return protos;
}
public static VolumeFailureSummary convertVolumeFailureSummary(
VolumeFailureSummaryProto proto) {
List<String> failedStorageLocations = proto.getFailedStorageLocationsList();
return new VolumeFailureSummary(
failedStorageLocations.toArray(new String[failedStorageLocations.size()]),
proto.getLastVolumeFailureDate(), proto.getEstimatedCapacityLostTotal());
}
public static VolumeFailureSummaryProto convertVolumeFailureSummary(
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummaryProto.Builder builder =
VolumeFailureSummaryProto.newBuilder();
for (String failedStorageLocation:
volumeFailureSummary.getFailedStorageLocations()) {
builder.addFailedStorageLocations(failedStorageLocation);
}
builder.setLastVolumeFailureDate(
volumeFailureSummary.getLastVolumeFailureDate());
builder.setEstimatedCapacityLostTotal(
volumeFailureSummary.getEstimatedCapacityLostTotal());
return builder.build();
}
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.IntrusiveCollection;
@ -216,6 +217,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0;
private VolumeFailureSummary volumeFailureSummary = null;
/**
* When set to true, the node is not in include list and is not allowed
@ -235,7 +237,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
*/
public DatanodeDescriptor(DatanodeID nodeID) {
super(nodeID);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
/**
@ -246,7 +248,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) {
super(nodeID, networkLocation);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
@VisibleForTesting
@ -347,9 +349,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
* Updates stats from datanode heartbeat.
*/
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures) {
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures);
volFailures, volumeFailureSummary);
heartbeatedSinceRegistration = true;
}
@ -357,7 +360,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
* process datanode heartbeat or stats initialization.
*/
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures) {
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
long totalCapacity = 0;
long totalRemaining = 0;
long totalBlockPoolUsed = 0;
@ -372,7 +376,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
// during the current DN registration session.
// When volumeFailures == this.volumeFailures, it implies there is no
// state change. No need to check for failed storage. This is an
// optimization.
// optimization. Recent versions of the DataNode report a
// VolumeFailureSummary containing the date/time of the last volume
// failure. If that's available, then we check that instead for greater
// accuracy.
// 2. After DN restarts, volFailures might not increase and it is possible
// we still have new failed storage. For example, admins reduce
// available storages in configuration. Another corner case
@ -381,8 +388,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
// one element in storageReports and that is A. b) A failed. c) Before
// DN sends HB to NN to indicate A has failed, DN restarts. d) After DN
// restarts, storageReports has one element which is B.
boolean checkFailedStorages = (volFailures > this.volumeFailures) ||
!heartbeatedSinceRegistration;
final boolean checkFailedStorages;
if (volumeFailureSummary != null && this.volumeFailureSummary != null) {
checkFailedStorages = volumeFailureSummary.getLastVolumeFailureDate() >
this.volumeFailureSummary.getLastVolumeFailureDate();
} else {
checkFailedStorages = (volFailures > this.volumeFailures) ||
!heartbeatedSinceRegistration;
}
if (checkFailedStorages) {
LOG.info("Number of failed storage changes from "
@ -396,6 +409,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
setXceiverCount(xceiverCount);
setLastUpdate(Time.now());
this.volumeFailures = volFailures;
this.volumeFailureSummary = volumeFailureSummary;
for (StorageReport report : reports) {
DatanodeStorageInfo storage = updateStorage(report.getStorage());
if (checkFailedStorages) {
@ -730,6 +744,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
return volumeFailures;
}
/**
* Returns info about volume failures.
*
* @return info about volume failures, possibly null
*/
public VolumeFailureSummary getVolumeFailureSummary() {
return volumeFailureSummary;
}
/**
* @param nodeReg DatanodeID to update registration for.
*/

View File

@ -1386,8 +1386,8 @@ public class DatanodeManager {
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes
) throws IOException {
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
@ -1409,7 +1409,8 @@ public class DatanodeManager {
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes);
xceiverCount, failedVolumes,
volumeFailureSummary);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
@ -192,7 +193,7 @@ class HeartbeatManager implements DatanodeStatistics {
addDatanode(d);
//update its timestamp
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
}
@ -217,10 +218,11 @@ class HeartbeatManager implements DatanodeStatistics {
synchronized void updateHeartbeat(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes) {
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes);
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
@ -579,14 +580,19 @@ class BPServiceActor implements Runnable {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
dn.getFSDataset().getNumFailedVolumes());
numFailedVolumes,
volumeFailureSummary);
}
//This must be called only by BPOfferService

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ReflectionUtils;
@ -129,6 +130,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** @return a volume information map (name => info). */
public Map<String, Object> getVolumeInfoMap();
/**
* Returns info about volume failures.
*
* @return info about volume failures, possibly null
*/
VolumeFailureSummary getVolumeFailureSummary();
/** @return a list of finalized blocks for the given block pool. */
public List<FinalizedReplica> getFinalizedBlocks(String bpid);

View File

@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.nativeio.NativeIO;
@ -114,6 +115,7 @@ import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**************************************************
* FSDataset manages a set of data blocks. Each block
@ -266,9 +268,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
dataLocations, storage);
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
int volsFailed = volsConfigured - storage.getNumStorageDirs();
int volsFailed = volumeFailureInfos.size();
this.validVolsRequired = volsConfigured - volFailuresTolerated;
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
@ -293,7 +297,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volsFailed, datanode.getBlockScanner(),
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
@ -315,6 +319,36 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
}
/**
* Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference
* between all configured storage locations and the actual storage locations in
* use after attempting to put all of them into service.
*
* @return each storage location that has failed
*/
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
dataLocations.size());
for (StorageLocation sl: dataLocations) {
failedLocationSet.add(sl.getFile().getAbsolutePath());
}
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd = it.next();
failedLocationSet.remove(sd.getRoot().getAbsolutePath());
}
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
failedLocationSet.size());
long failureDate = Time.now();
for (String failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
failureDate));
}
return volumeFailureInfos;
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
final File dir = sd.getCurrentDir();
@ -350,8 +384,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final File dir = location.getFile();
// Prepare volume in DataStorage
DataStorage.VolumeBuilder builder =
dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
final DataStorage.VolumeBuilder builder;
try {
builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
} catch (IOException e) {
volumes.addVolumeFailureInfo(new VolumeFailureInfo(
location.getFile().getAbsolutePath(), Time.now()));
throw e;
}
final Storage.StorageDirectory sd = builder.getStorageDirectory();
@ -500,9 +540,65 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/**
* Return the number of failed volumes in the FSDataset.
*/
@Override
@Override // FSDatasetMBean
public int getNumFailedVolumes() {
return volumes.numberOfFailedVolumes();
return volumes.getVolumeFailureInfos().length;
}
@Override // FSDatasetMBean
public String[] getFailedStorageLocations() {
VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
infos.length);
for (VolumeFailureInfo info: infos) {
failedStorageLocations.add(info.getFailedStorageLocation());
}
return failedStorageLocations.toArray(
new String[failedStorageLocations.size()]);
}
@Override // FSDatasetMBean
public long getLastVolumeFailureDate() {
long lastVolumeFailureDate = 0;
for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
long failureDate = info.getFailureDate();
if (failureDate > lastVolumeFailureDate) {
lastVolumeFailureDate = failureDate;
}
}
return lastVolumeFailureDate;
}
@Override // FSDatasetMBean
public long getEstimatedCapacityLostTotal() {
long estimatedCapacityLostTotal = 0;
for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
}
return estimatedCapacityLostTotal;
}
@Override // FsDatasetSpi
public VolumeFailureSummary getVolumeFailureSummary() {
VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
if (infos.length == 0) {
return null;
}
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
infos.length);
long lastVolumeFailureDate = 0;
long estimatedCapacityLostTotal = 0;
for (VolumeFailureInfo info: infos) {
failedStorageLocations.add(info.getFailedStorageLocation());
long failureDate = info.getFailureDate();
if (failureDate > lastVolumeFailureDate) {
lastVolumeFailureDate = failureDate;
}
estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
}
return new VolumeFailureSummary(
failedStorageLocations.toArray(new String[failedStorageLocations.size()]),
lastVolumeFailureDate, estimatedCapacityLostTotal);
}
@Override // FSDatasetMBean

View File

@ -22,9 +22,12 @@ import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
@ -40,21 +43,23 @@ import org.apache.hadoop.util.Time;
class FsVolumeList {
private final AtomicReference<FsVolumeImpl[]> volumes =
new AtomicReference<>(new FsVolumeImpl[0]);
// Tracks volume failures, sorted by volume path.
private final Map<String, VolumeFailureInfo> volumeFailureInfos =
Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
private Object checkDirsMutex = new Object();
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private final BlockScanner blockScanner;
private volatile int numFailedVolumes;
FsVolumeList(int failedVols, BlockScanner blockScanner,
FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
BlockScanner blockScanner,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
this.blockChooser = blockChooser;
this.blockScanner = blockScanner;
this.numFailedVolumes = failedVols;
}
int numberOfFailedVolumes() {
return numFailedVolumes;
for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
volumeFailureInfo);
}
}
/**
@ -238,7 +243,7 @@ class FsVolumeList {
}
removedVols.add(fsv);
removeVolume(fsv);
numFailedVolumes++;
addVolumeFailureInfo(fsv);
} catch (ClosedChannelException e) {
FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
"reference count on closed volume", e);
@ -347,6 +352,26 @@ class FsVolumeList {
removeVolume(fsVolume);
}
}
removeVolumeFailureInfo(volume);
}
VolumeFailureInfo[] getVolumeFailureInfos() {
Collection<VolumeFailureInfo> infos = volumeFailureInfos.values();
return infos.toArray(new VolumeFailureInfo[infos.size()]);
}
void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
volumeFailureInfo);
}
private void addVolumeFailureInfo(FsVolumeImpl vol) {
addVolumeFailureInfo(new VolumeFailureInfo(vol.getBasePath(), Time.now(),
vol.getCapacity()));
}
private void removeVolumeFailureInfo(File vol) {
volumeFailureInfos.remove(vol.getAbsolutePath());
}
void addBlockPool(final String bpid, final Configuration conf) throws IOException {

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
/**
* Tracks information about failure of a data volume.
*/
final class VolumeFailureInfo {
private final String failedStorageLocation;
private final long failureDate;
private final long estimatedCapacityLost;
/**
* Creates a new VolumeFailureInfo, when the capacity lost from this volume
* failure is unknown. Typically, this means the volume failed immediately at
* startup, so there was never a chance to query its capacity.
*
* @param failedStorageLocation storage location that has failed
* @param failureDate date/time of failure in milliseconds since epoch
*/
public VolumeFailureInfo(String failedStorageLocation, long failureDate) {
this(failedStorageLocation, failureDate, 0);
}
/**
* Creates a new VolumeFailureInfo.
*
* @param failedStorageLocation storage location that has failed
* @param failureDate date/time of failure in milliseconds since epoch
* @param estimatedCapacityLost estimate of capacity lost in bytes
*/
public VolumeFailureInfo(String failedStorageLocation, long failureDate,
long estimatedCapacityLost) {
this.failedStorageLocation = failedStorageLocation;
this.failureDate = failureDate;
this.estimatedCapacityLost = estimatedCapacityLost;
}
/**
* Returns the storage location that has failed.
*
* @return storage location that has failed
*/
public String getFailedStorageLocation() {
return this.failedStorageLocation;
}
/**
* Returns date/time of failure
*
* @return date/time of failure in milliseconds since epoch
*/
public long getFailureDate() {
return this.failureDate;
}
/**
* Returns estimate of capacity lost. This is said to be an estimate, because
* in some cases it's impossible to know the capacity of the volume, such as if
* we never had a chance to query its capacity before the failure occurred.
*
* @return estimate of capacity lost in bytes
*/
public long getEstimatedCapacityLost() {
return this.estimatedCapacityLost;
}
}

View File

@ -77,6 +77,25 @@ public interface FSDatasetMBean {
*/
public int getNumFailedVolumes();
/**
* Returns each storage location that has failed, sorted.
* @return each storage location that has failed, sorted
*/
String[] getFailedStorageLocations();
/**
* Returns the date/time of the last volume failure in milliseconds since
* epoch.
* @return date/time of last volume failure in milliseconds since epoch
*/
long getLastVolumeFailureDate();
/**
* Returns an estimate of total capacity lost due to volume failures in bytes.
* @return estimate of total capacity lost in bytes
*/
long getEstimatedCapacityLostTotal();
/**
* Returns the amount of cache used by the datanode (in bytes).
*/

View File

@ -254,6 +254,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
@ -4411,8 +4412,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
readLock();
try {
//get datanode commands
@ -4420,7 +4421,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes);
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
@ -5941,6 +5942,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return deadDecommissioned;
}
@Override // FSNamesystemMBean
public int getVolumeFailuresTotal() {
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
int volumeFailuresTotal = 0;
for (DatanodeDescriptor node: live) {
volumeFailuresTotal += node.getVolumeFailures();
}
return volumeFailuresTotal;
}
@Override // FSNamesystemMBean
public long getEstimatedCapacityLostTotal() {
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
long estimatedCapacityLostTotal = 0;
for (DatanodeDescriptor node: live) {
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
if (volumeFailureSummary != null) {
estimatedCapacityLostTotal +=
volumeFailureSummary.getEstimatedCapacityLostTotal();
}
}
return estimatedCapacityLostTotal;
}
@Override // FSNamesystemMBean
public int getNumDecommissioningDataNodes() {
return getBlockManager().getDatanodeManager().getDecommissioningNodes()
@ -6784,7 +6811,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
for (DatanodeDescriptor node : live) {
Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
ImmutableMap.Builder<String, Object> innerinfo =
ImmutableMap.<String,Object>builder();
innerinfo
.put("infoAddr", node.getInfoAddr())
.put("infoSecureAddr", node.getInfoSecureAddr())
.put("xferaddr", node.getXferAddr())
@ -6800,9 +6829,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
.put("blockScheduled", node.getBlocksScheduled())
.put("blockPoolUsed", node.getBlockPoolUsed())
.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
.put("volfails", node.getVolumeFailures())
.build();
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
.put("volfails", node.getVolumeFailures());
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
if (volumeFailureSummary != null) {
innerinfo
.put("failedStorageLocations",
volumeFailureSummary.getFailedStorageLocations())
.put("lastVolumeFailureDate",
volumeFailureSummary.getLastVolumeFailureDate())
.put("estimatedCapacityLostTotal",
volumeFailureSummary.getEstimatedCapacityLostTotal());
}
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo.build());
}
return JSON.toString(info);
}

View File

@ -136,6 +136,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
@ -1275,12 +1276,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes) throws IOException {
int failedVolumes, VolumeFailureSummary volumeFailureSummary)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes);
failedVolumes, volumeFailureSummary);
}
@Override // DatanodeProtocol

View File

@ -131,6 +131,19 @@ public interface FSNamesystemMBean {
*/
public int getNumDecomDeadDataNodes();
/**
* Number of failed data volumes across all live data nodes.
* @return number of failed data volumes across all live data nodes
*/
int getVolumeFailuresTotal();
/**
* Returns an estimate of total capacity lost due to volume failures in bytes
* across all live data nodes.
* @return estimate of total capacity lost in bytes
*/
long getEstimatedCapacityLostTotal();
/**
* Number of data nodes that are in the decommissioning state
*/

View File

@ -102,6 +102,7 @@ public interface DatanodeProtocol {
* @param xmitsInProgress number of transfers from this datanode to others
* @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes
* @param volumeFailureSummary info about volume failures
* @throws IOException on error
*/
@Idempotent
@ -111,7 +112,9 @@ public interface DatanodeProtocol {
long dnCacheUsed,
int xmitsInProgress,
int xceiverCount,
int failedVolumes) throws IOException;
int failedVolumes,
VolumeFailureSummary volumeFailureSummary)
throws IOException;
/**
* blockReport() tells the NameNode about all the locally-stored blocks.

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* Summarizes information about data volume failures on a DataNode.
*/
public class VolumeFailureSummary {
private final String[] failedStorageLocations;
private final long lastVolumeFailureDate;
private final long estimatedCapacityLostTotal;
/**
* Creates a new VolumeFailureSummary.
*
* @param failedStorageLocations storage locations that have failed
* @param lastVolumeFailureDate date/time of last volume failure in
* milliseconds since epoch
* @param estimatedCapacityLostTotal estimate of capacity lost in bytes
*/
public VolumeFailureSummary(String[] failedStorageLocations,
long lastVolumeFailureDate, long estimatedCapacityLostTotal) {
this.failedStorageLocations = failedStorageLocations;
this.lastVolumeFailureDate = lastVolumeFailureDate;
this.estimatedCapacityLostTotal = estimatedCapacityLostTotal;
}
/**
* Returns each storage location that has failed, sorted.
*
* @return each storage location that has failed, sorted
*/
public String[] getFailedStorageLocations() {
return this.failedStorageLocations;
}
/**
* Returns the date/time of the last volume failure in milliseconds since
* epoch.
*
* @return date/time of last volume failure in milliseconds since epoch
*/
public long getLastVolumeFailureDate() {
return this.lastVolumeFailureDate;
}
/**
* Returns estimate of capacity lost. This is said to be an estimate, because
* in some cases it's impossible to know the capacity of the volume, such as if
* we never had a chance to query its capacity before the failure occurred.
*
* @return estimate of capacity lost in bytes
*/
public long getEstimatedCapacityLostTotal() {
return this.estimatedCapacityLostTotal;
}
}

View File

@ -159,6 +159,17 @@ message RegisterDatanodeResponseProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
}
/**
* failedStorageLocations - storage locations that have failed
* lastVolumeFailureDate - date/time of last volume failure
* estimatedCapacityLost - estimate of total capacity lost due to volume failures
*/
message VolumeFailureSummaryProto {
repeated string failedStorageLocations = 1;
required uint64 lastVolumeFailureDate = 2;
required uint64 estimatedCapacityLostTotal = 3;
}
/**
* registration - datanode registration information
* capacity - total storage capacity available at the datanode
@ -167,9 +178,12 @@ message RegisterDatanodeResponseProto {
* blockPoolUsed - storage used by the block pool
* xmitsInProgress - number of transfers from this datanode to others
* xceiverCount - number of active transceiver threads
* failedVolumes - number of failed volumes
* failedVolumes - number of failed volumes. This is redundant with the
* information included in volumeFailureSummary, but the field is retained
* for backwards compatibility.
* cacheCapacity - total cache capacity available at the datanode
* cacheUsed - amount of cache used
* volumeFailureSummary - info about volume failures
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@ -179,6 +193,7 @@ message HeartbeatRequestProto {
optional uint32 failedVolumes = 5 [ default = 0 ];
optional uint64 cacheCapacity = 6 [ default = 0 ];
optional uint64 cacheUsed = 7 [default = 0 ];
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
}
/**

View File

@ -34,6 +34,7 @@
<ul class="nav navbar-nav" id="ui-tabs">
<li><a href="#tab-overview">Overview</a></li>
<li><a href="#tab-datanode">Datanodes</a></li>
<li><a href="#tab-datanode-volume-failures">Datanode Volume Failures</a></li>
<li><a href="#tab-snapshot">Snapshot</a></li>
<li><a href="#tab-startup-progress">Startup Progress</a></li>
<li class="dropdown">
@ -59,6 +60,7 @@
<div class="tab-content">
<div class="tab-pane" id="tab-overview"></div>
<div class="tab-pane" id="tab-datanode"></div>
<div class="tab-pane" id="tab-datanode-volume-failures"></div>
<div class="tab-pane" id="tab-snapshot"></div>
<div class="tab-pane" id="tab-startup-progress"></div>
</div>
@ -165,6 +167,7 @@
<tr><th><a href="#tab-datanode">Live Nodes</a></th><td>{NumLiveDataNodes} (Decommissioned: {NumDecomLiveDataNodes})</td></tr>
<tr><th><a href="#tab-datanode">Dead Nodes</a></th><td>{NumDeadDataNodes} (Decommissioned: {NumDecomDeadDataNodes})</td></tr>
<tr><th><a href="#tab-datanode">Decommissioning Nodes</a></th><td>{NumDecommissioningDataNodes}</td></tr>
<tr><th><a href="#tab-datanode-volume-failures">Total Datanode Volume Failures</a></th><td>{VolumeFailuresTotal} ({EstimatedCapacityLostTotal|fmt_bytes})</td></tr>
<tr><th title="Excludes missing blocks.">Number of Under-Replicated Blocks</th><td>{UnderReplicatedBlocks}</td></tr>
<tr><th>Number of Blocks Pending Deletion</th><td>{PendingDeletionBlocks}</td></tr>
<tr><th>Block Deletion Start Time</th><td>{BlockDeletionStartTime|date_tostring}</td></tr>
@ -324,6 +327,36 @@
</small>
</script>
<script type="text/x-dust-template" id="tmpl-datanode-volume-failures">
<div class="page-header"><h1>Datanode Volume Failures</h1></div>
<small>
{?LiveNodes}
<table class="table">
<thead>
<tr>
<th>Node</th>
<th>Last Failure Date</th>
<th>Failed Volumes</th>
<th>Estimated Capacity Lost</th>
<th>Failed Storage Locations</th>
</tr>
</thead>
{#LiveNodes}
<tr>
<td>{name} ({xferaddr})</td>
<td>{#helper_date_tostring value="{lastVolumeFailureDate}"/}</td>
<td>{volfails}</td>
<td>{estimatedCapacityLostTotal|fmt_bytes}</td>
<td>{#failedStorageLocations}{.}{@sep}<br />{/sep}{/failedStorageLocations}</td>
</tr>
{/LiveNodes}
</table>
{:else}
There are no reported volume failures.
{/LiveNodes}
</small>
</script>
<script type="text/x-dust-template" id="tmpl-startup-progress">
<div class="page-header"><h1>Startup Progress</h1></div>
<p>Elapsed Time: {elapsedTime|fmt_time}, Percent Complete: {percentComplete|fmt_percentage}</p>

View File

@ -21,6 +21,7 @@
dust.loadSource(dust.compile($('#tmpl-dfshealth').html(), 'dfshealth'));
dust.loadSource(dust.compile($('#tmpl-startup-progress').html(), 'startup-progress'));
dust.loadSource(dust.compile($('#tmpl-datanode').html(), 'datanode-info'));
dust.loadSource(dust.compile($('#tmpl-datanode-volume-failures').html(), 'datanode-volume-failures'));
dust.loadSource(dust.compile($('#tmpl-snapshot').html(), 'snapshot-info'));
function load_overview() {
@ -193,6 +194,45 @@
})).error(ajax_error_handler);
}
function load_datanode_volume_failures() {
var HELPERS = {
'helper_date_tostring' : function (chunk, ctx, bodies, params) {
var value = dust.helpers.tap(params.value, chunk, ctx);
return chunk.write('' + new Date(Number(value)).toLocaleString());
}
};
function workaround(r) {
function node_map_to_array(nodes) {
var res = [];
for (var n in nodes) {
var p = nodes[n];
// Filter the display to only datanodes with volume failures.
if (p.volfails > 0) {
p.name = n;
res.push(p);
}
}
return res;
}
r.LiveNodes = node_map_to_array(JSON.parse(r.LiveNodes));
return r;
}
$.get(
'/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo',
guard_with_startup_progress(function (resp) {
var data = workaround(resp.beans[0]);
var base = dust.makeBase(HELPERS);
dust.render('datanode-volume-failures', base.push(data), function(err, out) {
$('#tab-datanode-volume-failures').html(out);
$('#ui-tabs a[href="#tab-datanode-volume-failures"]').tab('show');
});
})).error(ajax_error_handler);
}
function load_snapshot_info() {
$.get(
'/jmx?qry=Hadoop:service=NameNode,name=SnapshotInfo',
@ -210,6 +250,9 @@
case "#tab-datanode":
load_datanode_info();
break;
case "#tab-datanode-volume-failures":
load_datanode_volume_failures();
break;
case "#tab-snapshot":
load_snapshot_info();
break;

View File

@ -114,7 +114,8 @@ public class TestBlockManager {
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0);
BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0,
null);
bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.stat.inference.TestUtils;
@ -87,7 +86,8 @@ public class TestNameNodePrunesMissingStorages {
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, 0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
0, null);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

View File

@ -107,7 +107,7 @@ public class TestOverReplicatedBlocks {
datanode.getStorageInfos()[0].setUtilizationForTesting(100L, 100L, 0, 100L);
datanode.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
0L, 0L, 0, 0);
0L, 0L, 0, 0, null);
}
}

View File

@ -97,7 +97,7 @@ public class TestReplicationPolicy {
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
}
@BeforeClass

View File

@ -97,7 +97,7 @@ public class TestReplicationPolicyConsiderLoad {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L);
dataNodes[i].updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]),
0L, 0L, 0, 0);
0L, 0L, 0, 0, null);
}
}
@ -115,17 +115,17 @@ public class TestReplicationPolicyConsiderLoad {
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
blockPoolId, dataNodes[3].getCacheCapacity(),
dataNodes[3].getCacheRemaining(),
2, 0, 0);
2, 0, 0, null);
dnManager.handleHeartbeat(dnrList.get(4),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
blockPoolId, dataNodes[4].getCacheCapacity(),
dataNodes[4].getCacheRemaining(),
4, 0, 0);
4, 0, 0, null);
dnManager.handleHeartbeat(dnrList.get(5),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
blockPoolId, dataNodes[5].getCacheCapacity(),
dataNodes[5].getCacheRemaining(),
4, 0, 0);
4, 0, 0, null);
// value in the above heartbeats
final int load = 2 + 4 + 4;

View File

@ -185,7 +185,7 @@ public class TestReplicationPolicyWithNodeGroup {
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
}
private static void setupDataNodeCapacity() {

View File

@ -35,6 +35,7 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
@ -627,6 +629,26 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return storage.getNumFailedVolumes();
}
@Override // FSDatasetMBean
public String[] getFailedStorageLocations() {
return null;
}
@Override // FSDatasetMBean
public long getLastVolumeFailureDate() {
return 0;
}
@Override // FSDatasetMBean
public long getEstimatedCapacityLostTotal() {
return 0;
}
@Override // FsDatasetSpi
public VolumeFailureSummary getVolumeFailureSummary() {
return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0);
}
@Override // FSDatasetMBean
public long getCacheUsed() {
return 0l;

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Time;
@ -137,7 +138,8 @@ public class TestBPOfferService {
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt());
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
return mock;
}

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
@ -158,7 +159,8 @@ public class TestBlockRecovery {
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt()))
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
@ -30,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -39,6 +42,10 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
@ -58,6 +65,7 @@ public class TestDataNodeVolumeFailureReporting {
private MiniDFSCluster cluster;
private Configuration conf;
private String dataDir;
private long volumeCapacity;
// Sleep at least 3 seconds (a 1s heartbeat plus padding) to allow
// for heartbeats to propagate from the datanodes to the namenode.
@ -69,29 +77,29 @@ public class TestDataNodeVolumeFailureReporting {
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
/*
* Lower the DN heartbeat, DF rate, and recheck interval to one second
* so state about failures and datanode death propagates faster.
*/
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
// These tests simulate volume failures by denying execute permission on the
// volume's path. On Windows, the owner of an object is always allowed
// access, so we can't run these tests on Windows.
assumeTrue(!Path.WINDOWS);
// Allow a single volume failure (there are two volumes)
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dataDir = cluster.getDataDirectory();
initCluster(1, 2, 1);
}
@After
public void tearDown() throws Exception {
for (int i = 0; i < 3; i++) {
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+1)), true);
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+2)), true);
// Restore executable permission on all directories where a failure may have
// been simulated by denying execute access. This is based on the maximum
// number of datanodes and the maximum number of storages per data node used
// throughout the tests in this suite.
int maxDataNodes = 3;
int maxStoragesPerDataNode = 4;
for (int i = 0; i < maxDataNodes; i++) {
for (int j = 1; j <= maxStoragesPerDataNode; j++) {
String subDir = "data" + ((i * maxStoragesPerDataNode) + j);
FileUtil.setExecutable(new File(dataDir, subDir), true);
}
}
IOUtils.cleanup(LOG, fs);
cluster.shutdown();
}
@ -102,8 +110,6 @@ public class TestDataNodeVolumeFailureReporting {
*/
@Test
public void testSuccessiveVolumeFailures() throws Exception {
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
// Bring up two more datanodes
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
@ -151,12 +157,9 @@ public class TestDataNodeVolumeFailureReporting {
/*
* The metrics should confirm the volume failures.
*/
assertCounter("VolumeFailures", 1L,
getMetrics(dns.get(0).getMetrics().name()));
assertCounter("VolumeFailures", 1L,
getMetrics(dns.get(1).getMetrics().name()));
assertCounter("VolumeFailures", 0L,
getMetrics(dns.get(2).getMetrics().name()));
checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(2), 0, true);
// Ensure we wait a sufficient amount of time
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@ -164,6 +167,10 @@ public class TestDataNodeVolumeFailureReporting {
// Eventually the NN should report two volume failures
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(2), true);
/*
* Now fail a volume on the third datanode. We should be able to get
@ -174,17 +181,10 @@ public class TestDataNodeVolumeFailureReporting {
DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file2, (short)3);
assertTrue("DN3 should still be up", dns.get(2).isDatanodeUp());
assertCounter("VolumeFailures", 1L,
getMetrics(dns.get(2).getMetrics().name()));
checkFailuresAtDataNode(dns.get(2), 1, true, dn3Vol1.getAbsolutePath());
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, dead, false);
live.clear();
dead.clear();
dm.fetchDatanodes(live, dead, false);
assertEquals("DN3 should have 1 failed volume",
1, live.get(2).getVolumeFailures());
DataNodeTestUtils.triggerHeartbeat(dns.get(2));
checkFailuresAtNameNode(dm, dns.get(2), true, dn3Vol1.getAbsolutePath());
/*
* Once the datanodes have a chance to heartbeat their new capacity the
@ -194,6 +194,10 @@ public class TestDataNodeVolumeFailureReporting {
dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3,
origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 3);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(2), true, dn3Vol1.getAbsolutePath());
/*
* Now fail the 2nd volume on the 3rd datanode. All its volumes
@ -210,12 +214,15 @@ public class TestDataNodeVolumeFailureReporting {
DFSTestUtil.waitForDatanodeDeath(dns.get(2));
// And report two failed volumes
assertCounter("VolumeFailures", 2L,
getMetrics(dns.get(2).getMetrics().name()));
checkFailuresAtDataNode(dns.get(2), 2, true, dn3Vol1.getAbsolutePath(),
dn3Vol2.getAbsolutePath());
// The NN considers the DN dead
DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2,
origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
/*
* The datanode never tries to restore the failed volume, even if
@ -240,6 +247,11 @@ public class TestDataNodeVolumeFailureReporting {
*/
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity,
WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 0);
dns = cluster.getDataNodes();
checkFailuresAtNameNode(dm, dns.get(0), true);
checkFailuresAtNameNode(dm, dns.get(1), true);
checkFailuresAtNameNode(dm, dns.get(2), true);
}
/**
@ -247,8 +259,6 @@ public class TestDataNodeVolumeFailureReporting {
*/
@Test
public void testVolFailureStatsPreservedOnNNRestart() throws Exception {
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
// Bring up two more datanodes that can tolerate 1 failure
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
@ -268,15 +278,346 @@ public class TestDataNodeVolumeFailureReporting {
Path file1 = new Path("/test1");
DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)2);
ArrayList<DataNode> dns = cluster.getDataNodes();
// The NN reports two volumes failures
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
// After restarting the NN it still see the two failures
cluster.restartNameNode(0);
cluster.waitActive();
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
}
@Test
public void testMultipleVolFailuresOnNode() throws Exception {
// Reinitialize the cluster, configured with 4 storage locations per DataNode
// and tolerating up to 2 failures.
tearDown();
initCluster(3, 4, 2);
// Calculate the total capacity of all the datanodes. Sleep for three seconds
// to be sure the datanodes have had a chance to heartbeat their capacities.
Thread.sleep(WAIT_FOR_HEARTBEATS);
DatanodeManager dm = cluster.getNamesystem().getBlockManager()
.getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
File dn1Vol1 = new File(dataDir, "data"+(4*0+1));
File dn1Vol2 = new File(dataDir, "data"+(4*0+2));
File dn2Vol1 = new File(dataDir, "data"+(4*1+1));
File dn2Vol2 = new File(dataDir, "data"+(4*1+2));
// Make the first two volume directories on the first two datanodes
// non-accessible.
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1,
false));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol2,
false));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1,
false));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol2,
false));
// Create file1 and wait for 3 replicas (ie all DNs can still store a block).
// Then assert that all DNs are up, despite the volume failures.
Path file1 = new Path("/test1");
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)3);
ArrayList<DataNode> dns = cluster.getDataNodes();
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath(),
dn1Vol2.getAbsolutePath());
checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath(),
dn2Vol2.getAbsolutePath());
checkFailuresAtDataNode(dns.get(2), 0, true);
// Ensure we wait a sufficient amount of time
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
// Eventually the NN should report four volume failures
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 4,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 4);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath(),
dn1Vol2.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath(),
dn2Vol2.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(2), true);
}
@Test
public void testDataNodeReconfigureWithVolumeFailures() throws Exception {
// Bring up two more datanodes
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
// Fail the first volume on both datanodes (we have to keep the
// third healthy so one node in the pipeline will not fail).
File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
File dn1Vol2 = new File(dataDir, "data"+(2*0+2));
File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
File dn2Vol2 = new File(dataDir, "data"+(2*1+2));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, false));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, false));
Path file1 = new Path("/test1");
DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)2);
ArrayList<DataNode> dns = cluster.getDataNodes();
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(2), 0, true);
// Ensure we wait a sufficient amount of time
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
// The NN reports two volume failures
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
// Reconfigure each DataNode to remove its failed volumes.
reconfigureDataNode(dns.get(0), dn1Vol2);
reconfigureDataNode(dns.get(1), dn2Vol2);
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
checkFailuresAtDataNode(dns.get(0), 1, true);
checkFailuresAtDataNode(dns.get(1), 1, true);
// NN sees reduced capacity, but no volume failures.
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 0);
checkFailuresAtNameNode(dm, dns.get(0), true);
checkFailuresAtNameNode(dm, dns.get(1), true);
// Reconfigure again to try to add back the failed volumes.
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
// Ensure we wait a sufficient amount of time.
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
// The NN reports two volume failures again.
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(false, 2);
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
// Reconfigure a third time with the failed volumes. Afterwards, we expect
// the same volume failures to be reported. (No double-counting.)
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
// Ensure we wait a sufficient amount of time.
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
// The NN reports two volume failures again.
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(false, 2);
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
}
/**
* Checks the NameNode for correct values of aggregate counters tracking failed
* volumes across all DataNodes.
*
* @param expectCapacityKnown if true, then expect that the capacities of the
* volumes were known before the failures, and therefore the lost capacity
* can be reported
* @param expectedVolumeFailuresTotal expected number of failed volumes
*/
private void checkAggregateFailuresAtNameNode(boolean expectCapacityKnown,
int expectedVolumeFailuresTotal) {
FSNamesystem ns = cluster.getNamesystem();
assertEquals(expectedVolumeFailuresTotal, ns.getVolumeFailuresTotal());
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
expectedVolumeFailuresTotal);
assertEquals(expectedCapacityLost, ns.getEstimatedCapacityLostTotal());
}
/**
* Checks a DataNode for correct reporting of failed volumes.
*
* @param dn DataNode to check
* @param expectedVolumeFailuresCounter metric counter value for
* VolumeFailures. The current implementation actually counts the number
* of failed disk checker cycles, which may be different from the length of
* expectedFailedVolumes if multiple disks fail in the same disk checker
* cycle
* @param expectCapacityKnown if true, then expect that the capacities of the
* volumes were known before the failures, and therefore the lost capacity
* can be reported
* @param expectedFailedVolumes expected locations of failed volumes
* @throws Exception if there is any failure
*/
private void checkFailuresAtDataNode(DataNode dn,
long expectedVolumeFailuresCounter, boolean expectCapacityKnown,
String... expectedFailedVolumes) throws Exception {
assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
getMetrics(dn.getMetrics().name()));
FsDatasetSpi<?> fsd = dn.getFSDataset();
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());
if (expectedFailedVolumes.length > 0) {
assertTrue(fsd.getLastVolumeFailureDate() > 0);
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
expectedFailedVolumes.length);
assertEquals(expectedCapacityLost, fsd.getEstimatedCapacityLostTotal());
} else {
assertEquals(0, fsd.getLastVolumeFailureDate());
assertEquals(0, fsd.getEstimatedCapacityLostTotal());
}
}
/**
* Checks NameNode tracking of a particular DataNode for correct reporting of
* failed volumes.
*
* @param dm DatanodeManager to check
* @param dn DataNode to check
* @param expectCapacityKnown if true, then expect that the capacities of the
* volumes were known before the failures, and therefore the lost capacity
* can be reported
* @param expectedFailedVolumes expected locations of failed volumes
* @throws Exception if there is any failure
*/
private void checkFailuresAtNameNode(DatanodeManager dm, DataNode dn,
boolean expectCapacityKnown, String... expectedFailedVolumes)
throws Exception {
DatanodeDescriptor dd = cluster.getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId());
assertEquals(expectedFailedVolumes.length, dd.getVolumeFailures());
VolumeFailureSummary volumeFailureSummary = dd.getVolumeFailureSummary();
if (expectedFailedVolumes.length > 0) {
assertArrayEquals(expectedFailedVolumes, volumeFailureSummary
.getFailedStorageLocations());
assertTrue(volumeFailureSummary.getLastVolumeFailureDate() > 0);
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
expectedFailedVolumes.length);
assertEquals(expectedCapacityLost,
volumeFailureSummary.getEstimatedCapacityLostTotal());
} else {
assertNull(volumeFailureSummary);
}
}
/**
* Returns expected capacity lost for use in assertions. The return value is
* dependent on whether or not it is expected that the volume capacities were
* known prior to the failures.
*
* @param expectCapacityKnown if true, then expect that the capacities of the
* volumes were known before the failures, and therefore the lost capacity
* can be reported
* @param expectedVolumeFailuresTotal expected number of failed volumes
* @return estimated capacity lost in bytes
*/
private long getExpectedCapacityLost(boolean expectCapacityKnown,
int expectedVolumeFailuresTotal) {
return expectCapacityKnown ? expectedVolumeFailuresTotal * volumeCapacity :
0;
}
/**
* Initializes the cluster.
*
* @param numDataNodes number of datanodes
* @param storagesPerDatanode number of storage locations on each datanode
* @param failedVolumesTolerated number of acceptable volume failures
* @throws Exception if there is any failure
*/
private void initCluster(int numDataNodes, int storagesPerDatanode,
int failedVolumesTolerated) throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
/*
* Lower the DN heartbeat, DF rate, and recheck interval to one second
* so state about failures and datanode death propagates faster.
*/
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
failedVolumesTolerated);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
.storagesPerDatanode(storagesPerDatanode).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dataDir = cluster.getDataDirectory();
long dnCapacity = DFSTestUtil.getDatanodeCapacity(
cluster.getNamesystem().getBlockManager().getDatanodeManager(), 0);
volumeCapacity = dnCapacity / cluster.getStoragesPerDatanode();
}
/**
* Reconfigure a DataNode by setting a new list of volumes.
*
* @param dn DataNode to reconfigure
* @param newVols new volumes to configure
* @throws Exception if there is any failure
*/
private static void reconfigureDataNode(DataNode dn, File... newVols)
throws Exception {
StringBuilder dnNewDataDirs = new StringBuilder();
for (File newVol: newVols) {
if (dnNewDataDirs.length() > 0) {
dnNewDataDirs.append(',');
}
dnNewDataDirs.append(newVol.getAbsolutePath());
}
try {
dn.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
dnNewDataDirs.toString());
} catch (ReconfigurationException e) {
// This can be thrown if reconfiguration tries to use a failed volume.
// We need to swallow the exception, because some of our tests want to
// cover this case.
LOG.warn("Could not reconfigure DataNode.", e);
}
}
}

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@ -162,7 +163,7 @@ public class TestFsDatasetCache {
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt());
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any());
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -101,7 +102,8 @@ public class TestStorageReport {
Mockito.verify(nnSpy).sendHeartbeat(
any(DatanodeRegistration.class),
captor.capture(),
anyLong(), anyLong(), anyInt(), anyInt(), anyInt());
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
Mockito.any(VolumeFailureSummary.class));
StorageReport[] reports = captor.getValue();

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -375,6 +376,26 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
return 0;
}
@Override
public String[] getFailedStorageLocations() {
return null;
}
@Override
public long getLastVolumeFailureDate() {
return 0;
}
@Override
public long getEstimatedCapacityLostTotal() {
return 0;
}
@Override
public VolumeFailureSummary getVolumeFailureSummary() {
return null;
}
@Override
public long getCacheUsed() {
return 0;

View File

@ -47,6 +47,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -102,6 +103,7 @@ public class TestFsDatasetImpl {
String dataDir = StringUtils.join(",", dirStrings);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
when(storage.dirIterator()).thenReturn(dirs.iterator());
when(storage.getNumStorageDirs()).thenReturn(numDirs);
}
@ -240,8 +242,8 @@ public class TestFsDatasetImpl {
RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
new RoundRobinVolumeChoosingPolicy<>();
final BlockScanner blockScanner = new BlockScanner(datanode, conf);
final FsVolumeList volumeList =
new FsVolumeList(0, blockScanner, blockChooser);
final FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
// Initialize FsVolumeList with 5 mock volumes.

View File

@ -31,6 +31,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertNotEquals;
@ -57,7 +58,8 @@ public class TestFsVolumeList {
@Test
public void testGetNextVolumeWithClosedVolume() throws IOException {
FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
List<FsVolumeImpl> volumes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "nextvolume-" + i);
@ -82,7 +84,8 @@ public class TestFsVolumeList {
@Test
public void testCheckDirsWithClosedVolume() throws IOException {
FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
List<FsVolumeImpl> volumes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "volume-" + i);

View File

@ -951,7 +951,7 @@ public class NNThroughputBenchmark implements Tool {
StorageReport[] rep = { new StorageReport(storage, false,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0).getCommands();
0L, 0L, 0, 0, 0, null).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -998,7 +998,7 @@ public class NNThroughputBenchmark implements Tool {
StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0).getCommands();
rep, 0L, 0L, 0, 0, 0, null).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

View File

@ -117,7 +117,7 @@ public class NameNodeAdapter {
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0);
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null);
}
public static boolean setReplication(final FSNamesystem ns,

View File

@ -117,8 +117,8 @@ public class TestDeadDatanode {
StorageReport[] rep = { new StorageReport(
new DatanodeStorage(reg.getDatanodeUuid()),
false, 0, 0, 0, 0) };
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0)
.getCommands();
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null)
.getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());