HDFS-7604. Track and display failed DataNode storage locations in NameNode. Contributed by Chris Nauroth.
This commit is contained in:
parent
814afa46ef
commit
9729b244de
|
@ -634,6 +634,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple
|
||||
threads (cmccabe)
|
||||
|
||||
HDFS-7604. Track and display failed DataNode storage locations in NameNode.
|
||||
(cnauroth)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||
|
@ -121,8 +122,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
|||
@Override
|
||||
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
|
||||
StorageReport[] reports, long cacheCapacity, long cacheUsed,
|
||||
int xmitsInProgress, int xceiverCount, int failedVolumes)
|
||||
throws IOException {
|
||||
int xmitsInProgress, int xceiverCount, int failedVolumes,
|
||||
VolumeFailureSummary volumeFailureSummary) throws IOException {
|
||||
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
|
||||
.setRegistration(PBHelper.convert(registration))
|
||||
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
|
||||
|
@ -134,6 +135,10 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
|||
if (cacheUsed != 0) {
|
||||
builder.setCacheUsed(cacheUsed);
|
||||
}
|
||||
if (volumeFailureSummary != null) {
|
||||
builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
|
||||
volumeFailureSummary));
|
||||
}
|
||||
HeartbeatResponseProto resp;
|
||||
try {
|
||||
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -104,10 +105,14 @@ public class DatanodeProtocolServerSideTranslatorPB implements
|
|||
try {
|
||||
final StorageReport[] report = PBHelper.convertStorageReports(
|
||||
request.getReportsList());
|
||||
VolumeFailureSummary volumeFailureSummary =
|
||||
request.hasVolumeFailureSummary() ? PBHelper.convertVolumeFailureSummary(
|
||||
request.getVolumeFailureSummary()) : null;
|
||||
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
|
||||
report, request.getCacheCapacity(), request.getCacheUsed(),
|
||||
request.getXmitsInProgress(),
|
||||
request.getXceiverCount(), request.getFailedVolumes());
|
||||
request.getXceiverCount(), request.getFailedVolumes(),
|
||||
volumeFailureSummary);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
|
|
@ -122,6 +122,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCom
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHeartbeatProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||
|
@ -215,6 +216,7 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
||||
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
|
||||
|
@ -1901,6 +1903,29 @@ public class PBHelper {
|
|||
return protos;
|
||||
}
|
||||
|
||||
public static VolumeFailureSummary convertVolumeFailureSummary(
|
||||
VolumeFailureSummaryProto proto) {
|
||||
List<String> failedStorageLocations = proto.getFailedStorageLocationsList();
|
||||
return new VolumeFailureSummary(
|
||||
failedStorageLocations.toArray(new String[failedStorageLocations.size()]),
|
||||
proto.getLastVolumeFailureDate(), proto.getEstimatedCapacityLostTotal());
|
||||
}
|
||||
|
||||
public static VolumeFailureSummaryProto convertVolumeFailureSummary(
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
VolumeFailureSummaryProto.Builder builder =
|
||||
VolumeFailureSummaryProto.newBuilder();
|
||||
for (String failedStorageLocation:
|
||||
volumeFailureSummary.getFailedStorageLocations()) {
|
||||
builder.addFailedStorageLocations(failedStorageLocation);
|
||||
}
|
||||
builder.setLastVolumeFailureDate(
|
||||
volumeFailureSummary.getLastVolumeFailureDate());
|
||||
builder.setEstimatedCapacityLostTotal(
|
||||
volumeFailureSummary.getEstimatedCapacityLostTotal());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static JournalInfo convert(JournalInfoProto info) {
|
||||
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
|
||||
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.hdfs.util.EnumCounters;
|
||||
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
||||
import org.apache.hadoop.util.IntrusiveCollection;
|
||||
|
@ -214,6 +215,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
private long lastBlocksScheduledRollTime = 0;
|
||||
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
|
||||
private int volumeFailures = 0;
|
||||
private VolumeFailureSummary volumeFailureSummary = null;
|
||||
|
||||
/**
|
||||
* When set to true, the node is not in include list and is not allowed
|
||||
|
@ -233,7 +235,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
*/
|
||||
public DatanodeDescriptor(DatanodeID nodeID) {
|
||||
super(nodeID);
|
||||
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -244,7 +246,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
public DatanodeDescriptor(DatanodeID nodeID,
|
||||
String networkLocation) {
|
||||
super(nodeID, networkLocation);
|
||||
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -345,9 +347,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
* Updates stats from datanode heartbeat.
|
||||
*/
|
||||
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
|
||||
long cacheUsed, int xceiverCount, int volFailures) {
|
||||
long cacheUsed, int xceiverCount, int volFailures,
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
|
||||
volFailures);
|
||||
volFailures, volumeFailureSummary);
|
||||
heartbeatedSinceRegistration = true;
|
||||
}
|
||||
|
||||
|
@ -355,7 +358,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
* process datanode heartbeat or stats initialization.
|
||||
*/
|
||||
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
|
||||
long cacheUsed, int xceiverCount, int volFailures) {
|
||||
long cacheUsed, int xceiverCount, int volFailures,
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
long totalCapacity = 0;
|
||||
long totalRemaining = 0;
|
||||
long totalBlockPoolUsed = 0;
|
||||
|
@ -370,7 +374,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
// during the current DN registration session.
|
||||
// When volumeFailures == this.volumeFailures, it implies there is no
|
||||
// state change. No need to check for failed storage. This is an
|
||||
// optimization.
|
||||
// optimization. Recent versions of the DataNode report a
|
||||
// VolumeFailureSummary containing the date/time of the last volume
|
||||
// failure. If that's available, then we check that instead for greater
|
||||
// accuracy.
|
||||
// 2. After DN restarts, volFailures might not increase and it is possible
|
||||
// we still have new failed storage. For example, admins reduce
|
||||
// available storages in configuration. Another corner case
|
||||
|
@ -379,8 +386,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
// one element in storageReports and that is A. b) A failed. c) Before
|
||||
// DN sends HB to NN to indicate A has failed, DN restarts. d) After DN
|
||||
// restarts, storageReports has one element which is B.
|
||||
boolean checkFailedStorages = (volFailures > this.volumeFailures) ||
|
||||
!heartbeatedSinceRegistration;
|
||||
final boolean checkFailedStorages;
|
||||
if (volumeFailureSummary != null && this.volumeFailureSummary != null) {
|
||||
checkFailedStorages = volumeFailureSummary.getLastVolumeFailureDate() >
|
||||
this.volumeFailureSummary.getLastVolumeFailureDate();
|
||||
} else {
|
||||
checkFailedStorages = (volFailures > this.volumeFailures) ||
|
||||
!heartbeatedSinceRegistration;
|
||||
}
|
||||
|
||||
if (checkFailedStorages) {
|
||||
LOG.info("Number of failed storage changes from "
|
||||
|
@ -394,6 +407,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
setXceiverCount(xceiverCount);
|
||||
setLastUpdate(Time.now());
|
||||
this.volumeFailures = volFailures;
|
||||
this.volumeFailureSummary = volumeFailureSummary;
|
||||
for (StorageReport report : reports) {
|
||||
DatanodeStorageInfo storage = updateStorage(report.getStorage());
|
||||
if (checkFailedStorages) {
|
||||
|
@ -728,6 +742,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
return volumeFailures;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns info about volume failures.
|
||||
*
|
||||
* @return info about volume failures, possibly null
|
||||
*/
|
||||
public VolumeFailureSummary getVolumeFailureSummary() {
|
||||
return volumeFailureSummary;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param nodeReg DatanodeID to update registration for.
|
||||
*/
|
||||
|
|
|
@ -1387,8 +1387,8 @@ public class DatanodeManager {
|
|||
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
||||
StorageReport[] reports, final String blockPoolId,
|
||||
long cacheCapacity, long cacheUsed, int xceiverCount,
|
||||
int maxTransfers, int failedVolumes
|
||||
) throws IOException {
|
||||
int maxTransfers, int failedVolumes,
|
||||
VolumeFailureSummary volumeFailureSummary) throws IOException {
|
||||
synchronized (heartbeatManager) {
|
||||
synchronized (datanodeMap) {
|
||||
DatanodeDescriptor nodeinfo = null;
|
||||
|
@ -1410,7 +1410,8 @@ public class DatanodeManager {
|
|||
|
||||
heartbeatManager.updateHeartbeat(nodeinfo, reports,
|
||||
cacheCapacity, cacheUsed,
|
||||
xceiverCount, failedVolumes);
|
||||
xceiverCount, failedVolumes,
|
||||
volumeFailureSummary);
|
||||
|
||||
// If we are in safemode, do not send back any recovery / replication
|
||||
// requests. Don't even drain the existing queue of work.
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
|
@ -192,7 +193,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
addDatanode(d);
|
||||
|
||||
//update its timestamp
|
||||
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
|
||||
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,10 +218,11 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
|
||||
synchronized void updateHeartbeat(final DatanodeDescriptor node,
|
||||
StorageReport[] reports, long cacheCapacity, long cacheUsed,
|
||||
int xceiverCount, int failedVolumes) {
|
||||
int xceiverCount, int failedVolumes,
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
stats.subtract(node);
|
||||
node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
|
||||
xceiverCount, failedVolumes);
|
||||
xceiverCount, failedVolumes, volumeFailureSummary);
|
||||
stats.add(node);
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -579,14 +580,19 @@ class BPServiceActor implements Runnable {
|
|||
LOG.debug("Sending heartbeat with " + reports.length +
|
||||
" storage reports from service actor: " + this);
|
||||
}
|
||||
|
||||
|
||||
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
|
||||
.getVolumeFailureSummary();
|
||||
int numFailedVolumes = volumeFailureSummary != null ?
|
||||
volumeFailureSummary.getFailedStorageLocations().length : 0;
|
||||
return bpNamenode.sendHeartbeat(bpRegistration,
|
||||
reports,
|
||||
dn.getFSDataset().getCacheCapacity(),
|
||||
dn.getFSDataset().getCacheUsed(),
|
||||
dn.getXmitsInProgress(),
|
||||
dn.getXceiverCount(),
|
||||
dn.getFSDataset().getNumFailedVolumes());
|
||||
numFailedVolumes,
|
||||
volumeFailureSummary);
|
||||
}
|
||||
|
||||
//This must be called only by BPOfferService
|
||||
|
@ -1019,4 +1025,4 @@ class BPServiceActor implements Runnable {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
|
@ -129,6 +130,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
/** @return a volume information map (name => info). */
|
||||
public Map<String, Object> getVolumeInfoMap();
|
||||
|
||||
/**
|
||||
* Returns info about volume failures.
|
||||
*
|
||||
* @return info about volume failures, possibly null
|
||||
*/
|
||||
VolumeFailureSummary getVolumeFailureSummary();
|
||||
|
||||
/** @return a list of finalized blocks for the given block pool. */
|
||||
public List<FinalizedReplica> getFinalizedBlocks(String bpid);
|
||||
|
||||
|
|
|
@ -99,6 +99,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
|
@ -112,6 +113,7 @@ import org.apache.hadoop.util.Time;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**************************************************
|
||||
* FSDataset manages a set of data blocks. Each block
|
||||
|
@ -264,9 +266,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
|
||||
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
|
||||
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
|
||||
dataLocations, storage);
|
||||
|
||||
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
|
||||
int volsFailed = volsConfigured - storage.getNumStorageDirs();
|
||||
int volsFailed = volumeFailureInfos.size();
|
||||
this.validVolsRequired = volsConfigured - volFailuresTolerated;
|
||||
|
||||
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
|
||||
|
@ -291,7 +295,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
|
||||
RoundRobinVolumeChoosingPolicy.class,
|
||||
VolumeChoosingPolicy.class), conf);
|
||||
volumes = new FsVolumeList(volsFailed, datanode.getBlockScanner(),
|
||||
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
|
||||
blockChooserImpl);
|
||||
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
|
||||
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
|
||||
|
@ -313,6 +317,36 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets initial volume failure information for all volumes that failed
|
||||
* immediately at startup. The method works by determining the set difference
|
||||
* between all configured storage locations and the actual storage locations in
|
||||
* use after attempting to put all of them into service.
|
||||
*
|
||||
* @return each storage location that has failed
|
||||
*/
|
||||
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
|
||||
Collection<StorageLocation> dataLocations, DataStorage storage) {
|
||||
Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
|
||||
dataLocations.size());
|
||||
for (StorageLocation sl: dataLocations) {
|
||||
failedLocationSet.add(sl.getFile().getAbsolutePath());
|
||||
}
|
||||
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
|
||||
it.hasNext(); ) {
|
||||
Storage.StorageDirectory sd = it.next();
|
||||
failedLocationSet.remove(sd.getRoot().getAbsolutePath());
|
||||
}
|
||||
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
|
||||
failedLocationSet.size());
|
||||
long failureDate = Time.now();
|
||||
for (String failedStorageLocation: failedLocationSet) {
|
||||
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
|
||||
failureDate));
|
||||
}
|
||||
return volumeFailureInfos;
|
||||
}
|
||||
|
||||
private void addVolume(Collection<StorageLocation> dataLocations,
|
||||
Storage.StorageDirectory sd) throws IOException {
|
||||
final File dir = sd.getCurrentDir();
|
||||
|
@ -348,8 +382,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final File dir = location.getFile();
|
||||
|
||||
// Prepare volume in DataStorage
|
||||
DataStorage.VolumeBuilder builder =
|
||||
dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
|
||||
final DataStorage.VolumeBuilder builder;
|
||||
try {
|
||||
builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
|
||||
} catch (IOException e) {
|
||||
volumes.addVolumeFailureInfo(new VolumeFailureInfo(
|
||||
location.getFile().getAbsolutePath(), Time.now()));
|
||||
throw e;
|
||||
}
|
||||
|
||||
final Storage.StorageDirectory sd = builder.getStorageDirectory();
|
||||
|
||||
|
@ -498,9 +538,65 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
/**
|
||||
* Return the number of failed volumes in the FSDataset.
|
||||
*/
|
||||
@Override
|
||||
@Override // FSDatasetMBean
|
||||
public int getNumFailedVolumes() {
|
||||
return volumes.numberOfFailedVolumes();
|
||||
return volumes.getVolumeFailureInfos().length;
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public String[] getFailedStorageLocations() {
|
||||
VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
|
||||
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
|
||||
infos.length);
|
||||
for (VolumeFailureInfo info: infos) {
|
||||
failedStorageLocations.add(info.getFailedStorageLocation());
|
||||
}
|
||||
return failedStorageLocations.toArray(
|
||||
new String[failedStorageLocations.size()]);
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getLastVolumeFailureDate() {
|
||||
long lastVolumeFailureDate = 0;
|
||||
for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
|
||||
long failureDate = info.getFailureDate();
|
||||
if (failureDate > lastVolumeFailureDate) {
|
||||
lastVolumeFailureDate = failureDate;
|
||||
}
|
||||
}
|
||||
return lastVolumeFailureDate;
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getEstimatedCapacityLostTotal() {
|
||||
long estimatedCapacityLostTotal = 0;
|
||||
for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
|
||||
estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
|
||||
}
|
||||
return estimatedCapacityLostTotal;
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public VolumeFailureSummary getVolumeFailureSummary() {
|
||||
VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
|
||||
if (infos.length == 0) {
|
||||
return null;
|
||||
}
|
||||
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
|
||||
infos.length);
|
||||
long lastVolumeFailureDate = 0;
|
||||
long estimatedCapacityLostTotal = 0;
|
||||
for (VolumeFailureInfo info: infos) {
|
||||
failedStorageLocations.add(info.getFailedStorageLocation());
|
||||
long failureDate = info.getFailureDate();
|
||||
if (failureDate > lastVolumeFailureDate) {
|
||||
lastVolumeFailureDate = failureDate;
|
||||
}
|
||||
estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
|
||||
}
|
||||
return new VolumeFailureSummary(
|
||||
failedStorageLocations.toArray(new String[failedStorageLocations.size()]),
|
||||
lastVolumeFailureDate, estimatedCapacityLostTotal);
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
|
|
|
@ -22,9 +22,12 @@ import java.io.IOException;
|
|||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -40,21 +43,23 @@ import org.apache.hadoop.util.Time;
|
|||
class FsVolumeList {
|
||||
private final AtomicReference<FsVolumeImpl[]> volumes =
|
||||
new AtomicReference<>(new FsVolumeImpl[0]);
|
||||
// Tracks volume failures, sorted by volume path.
|
||||
private final Map<String, VolumeFailureInfo> volumeFailureInfos =
|
||||
Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
|
||||
private Object checkDirsMutex = new Object();
|
||||
|
||||
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
|
||||
private final BlockScanner blockScanner;
|
||||
private volatile int numFailedVolumes;
|
||||
|
||||
FsVolumeList(int failedVols, BlockScanner blockScanner,
|
||||
FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
|
||||
BlockScanner blockScanner,
|
||||
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
|
||||
this.blockChooser = blockChooser;
|
||||
this.blockScanner = blockScanner;
|
||||
this.numFailedVolumes = failedVols;
|
||||
}
|
||||
|
||||
int numberOfFailedVolumes() {
|
||||
return numFailedVolumes;
|
||||
for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
|
||||
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
|
||||
volumeFailureInfo);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -238,7 +243,7 @@ class FsVolumeList {
|
|||
}
|
||||
removedVols.add(fsv);
|
||||
removeVolume(fsv);
|
||||
numFailedVolumes++;
|
||||
addVolumeFailureInfo(fsv);
|
||||
} catch (ClosedChannelException e) {
|
||||
FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
|
||||
"reference count on closed volume", e);
|
||||
|
@ -347,6 +352,26 @@ class FsVolumeList {
|
|||
removeVolume(fsVolume);
|
||||
}
|
||||
}
|
||||
removeVolumeFailureInfo(volume);
|
||||
}
|
||||
|
||||
VolumeFailureInfo[] getVolumeFailureInfos() {
|
||||
Collection<VolumeFailureInfo> infos = volumeFailureInfos.values();
|
||||
return infos.toArray(new VolumeFailureInfo[infos.size()]);
|
||||
}
|
||||
|
||||
void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
|
||||
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
|
||||
volumeFailureInfo);
|
||||
}
|
||||
|
||||
private void addVolumeFailureInfo(FsVolumeImpl vol) {
|
||||
addVolumeFailureInfo(new VolumeFailureInfo(vol.getBasePath(), Time.now(),
|
||||
vol.getCapacity()));
|
||||
}
|
||||
|
||||
private void removeVolumeFailureInfo(File vol) {
|
||||
volumeFailureInfos.remove(vol.getAbsolutePath());
|
||||
}
|
||||
|
||||
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
/**
|
||||
* Tracks information about failure of a data volume.
|
||||
*/
|
||||
final class VolumeFailureInfo {
|
||||
private final String failedStorageLocation;
|
||||
private final long failureDate;
|
||||
private final long estimatedCapacityLost;
|
||||
|
||||
/**
|
||||
* Creates a new VolumeFailureInfo, when the capacity lost from this volume
|
||||
* failure is unknown. Typically, this means the volume failed immediately at
|
||||
* startup, so there was never a chance to query its capacity.
|
||||
*
|
||||
* @param failedStorageLocation storage location that has failed
|
||||
* @param failureDate date/time of failure in milliseconds since epoch
|
||||
*/
|
||||
public VolumeFailureInfo(String failedStorageLocation, long failureDate) {
|
||||
this(failedStorageLocation, failureDate, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new VolumeFailureInfo.
|
||||
*
|
||||
* @param failedStorageLocation storage location that has failed
|
||||
* @param failureDate date/time of failure in milliseconds since epoch
|
||||
* @param estimatedCapacityLost estimate of capacity lost in bytes
|
||||
*/
|
||||
public VolumeFailureInfo(String failedStorageLocation, long failureDate,
|
||||
long estimatedCapacityLost) {
|
||||
this.failedStorageLocation = failedStorageLocation;
|
||||
this.failureDate = failureDate;
|
||||
this.estimatedCapacityLost = estimatedCapacityLost;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the storage location that has failed.
|
||||
*
|
||||
* @return storage location that has failed
|
||||
*/
|
||||
public String getFailedStorageLocation() {
|
||||
return this.failedStorageLocation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns date/time of failure
|
||||
*
|
||||
* @return date/time of failure in milliseconds since epoch
|
||||
*/
|
||||
public long getFailureDate() {
|
||||
return this.failureDate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns estimate of capacity lost. This is said to be an estimate, because
|
||||
* in some cases it's impossible to know the capacity of the volume, such as if
|
||||
* we never had a chance to query its capacity before the failure occurred.
|
||||
*
|
||||
* @return estimate of capacity lost in bytes
|
||||
*/
|
||||
public long getEstimatedCapacityLost() {
|
||||
return this.estimatedCapacityLost;
|
||||
}
|
||||
}
|
|
@ -78,6 +78,25 @@ public interface FSDatasetMBean {
|
|||
*/
|
||||
public int getNumFailedVolumes();
|
||||
|
||||
/**
|
||||
* Returns each storage location that has failed, sorted.
|
||||
* @return each storage location that has failed, sorted
|
||||
*/
|
||||
String[] getFailedStorageLocations();
|
||||
|
||||
/**
|
||||
* Returns the date/time of the last volume failure in milliseconds since
|
||||
* epoch.
|
||||
* @return date/time of last volume failure in milliseconds since epoch
|
||||
*/
|
||||
long getLastVolumeFailureDate();
|
||||
|
||||
/**
|
||||
* Returns an estimate of total capacity lost due to volume failures in bytes.
|
||||
* @return estimate of total capacity lost in bytes
|
||||
*/
|
||||
long getEstimatedCapacityLostTotal();
|
||||
|
||||
/**
|
||||
* Returns the amount of cache used by the datanode (in bytes).
|
||||
*/
|
||||
|
|
|
@ -250,6 +250,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -4412,8 +4413,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
*/
|
||||
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
|
||||
StorageReport[] reports, long cacheCapacity, long cacheUsed,
|
||||
int xceiverCount, int xmitsInProgress, int failedVolumes)
|
||||
throws IOException {
|
||||
int xceiverCount, int xmitsInProgress, int failedVolumes,
|
||||
VolumeFailureSummary volumeFailureSummary) throws IOException {
|
||||
readLock();
|
||||
try {
|
||||
//get datanode commands
|
||||
|
@ -4421,7 +4422,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
- xmitsInProgress;
|
||||
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
|
||||
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
|
||||
xceiverCount, maxTransfer, failedVolumes);
|
||||
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
|
||||
|
||||
//create ha status
|
||||
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
|
||||
|
@ -5942,6 +5943,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return deadDecommissioned;
|
||||
}
|
||||
|
||||
@Override // FSNamesystemMBean
|
||||
public int getVolumeFailuresTotal() {
|
||||
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
|
||||
int volumeFailuresTotal = 0;
|
||||
for (DatanodeDescriptor node: live) {
|
||||
volumeFailuresTotal += node.getVolumeFailures();
|
||||
}
|
||||
return volumeFailuresTotal;
|
||||
}
|
||||
|
||||
@Override // FSNamesystemMBean
|
||||
public long getEstimatedCapacityLostTotal() {
|
||||
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||
getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true);
|
||||
long estimatedCapacityLostTotal = 0;
|
||||
for (DatanodeDescriptor node: live) {
|
||||
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
|
||||
if (volumeFailureSummary != null) {
|
||||
estimatedCapacityLostTotal +=
|
||||
volumeFailureSummary.getEstimatedCapacityLostTotal();
|
||||
}
|
||||
}
|
||||
return estimatedCapacityLostTotal;
|
||||
}
|
||||
|
||||
@Override // FSNamesystemMBean
|
||||
public int getNumDecommissioningDataNodes() {
|
||||
return getBlockManager().getDatanodeManager().getDecommissioningNodes()
|
||||
|
@ -6785,7 +6812,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
|
||||
for (DatanodeDescriptor node : live) {
|
||||
Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
|
||||
ImmutableMap.Builder<String, Object> innerinfo =
|
||||
ImmutableMap.<String,Object>builder();
|
||||
innerinfo
|
||||
.put("infoAddr", node.getInfoAddr())
|
||||
.put("infoSecureAddr", node.getInfoSecureAddr())
|
||||
.put("xferaddr", node.getXferAddr())
|
||||
|
@ -6801,9 +6830,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
.put("blockScheduled", node.getBlocksScheduled())
|
||||
.put("blockPoolUsed", node.getBlockPoolUsed())
|
||||
.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
|
||||
.put("volfails", node.getVolumeFailures())
|
||||
.build();
|
||||
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
|
||||
.put("volfails", node.getVolumeFailures());
|
||||
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
|
||||
if (volumeFailureSummary != null) {
|
||||
innerinfo
|
||||
.put("failedStorageLocations",
|
||||
volumeFailureSummary.getFailedStorageLocations())
|
||||
.put("lastVolumeFailureDate",
|
||||
volumeFailureSummary.getLastVolumeFailureDate())
|
||||
.put("estimatedCapacityLostTotal",
|
||||
volumeFailureSummary.getEstimatedCapacityLostTotal());
|
||||
}
|
||||
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo.build());
|
||||
}
|
||||
return JSON.toString(info);
|
||||
}
|
||||
|
|
|
@ -136,6 +136,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -1281,12 +1282,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
|
||||
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
|
||||
int xmitsInProgress, int xceiverCount,
|
||||
int failedVolumes) throws IOException {
|
||||
int failedVolumes, VolumeFailureSummary volumeFailureSummary)
|
||||
throws IOException {
|
||||
checkNNStartup();
|
||||
verifyRequest(nodeReg);
|
||||
return namesystem.handleHeartbeat(nodeReg, report,
|
||||
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
|
||||
failedVolumes);
|
||||
failedVolumes, volumeFailureSummary);
|
||||
}
|
||||
|
||||
@Override // DatanodeProtocol
|
||||
|
|
|
@ -131,6 +131,19 @@ public interface FSNamesystemMBean {
|
|||
*/
|
||||
public int getNumDecomDeadDataNodes();
|
||||
|
||||
/**
|
||||
* Number of failed data volumes across all live data nodes.
|
||||
* @return number of failed data volumes across all live data nodes
|
||||
*/
|
||||
int getVolumeFailuresTotal();
|
||||
|
||||
/**
|
||||
* Returns an estimate of total capacity lost due to volume failures in bytes
|
||||
* across all live data nodes.
|
||||
* @return estimate of total capacity lost in bytes
|
||||
*/
|
||||
long getEstimatedCapacityLostTotal();
|
||||
|
||||
/**
|
||||
* Number of data nodes that are in the decommissioning state
|
||||
*/
|
||||
|
|
|
@ -102,6 +102,7 @@ public interface DatanodeProtocol {
|
|||
* @param xmitsInProgress number of transfers from this datanode to others
|
||||
* @param xceiverCount number of active transceiver threads
|
||||
* @param failedVolumes number of failed volumes
|
||||
* @param volumeFailureSummary info about volume failures
|
||||
* @throws IOException on error
|
||||
*/
|
||||
@Idempotent
|
||||
|
@ -111,7 +112,9 @@ public interface DatanodeProtocol {
|
|||
long dnCacheUsed,
|
||||
int xmitsInProgress,
|
||||
int xceiverCount,
|
||||
int failedVolumes) throws IOException;
|
||||
int failedVolumes,
|
||||
VolumeFailureSummary volumeFailureSummary)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* blockReport() tells the NameNode about all the locally-stored blocks.
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocol;
|
||||
|
||||
/**
|
||||
* Summarizes information about data volume failures on a DataNode.
|
||||
*/
|
||||
public class VolumeFailureSummary {
|
||||
private final String[] failedStorageLocations;
|
||||
private final long lastVolumeFailureDate;
|
||||
private final long estimatedCapacityLostTotal;
|
||||
|
||||
/**
|
||||
* Creates a new VolumeFailureSummary.
|
||||
*
|
||||
* @param failedStorageLocations storage locations that have failed
|
||||
* @param lastVolumeFailureDate date/time of last volume failure in
|
||||
* milliseconds since epoch
|
||||
* @param estimatedCapacityLostTotal estimate of capacity lost in bytes
|
||||
*/
|
||||
public VolumeFailureSummary(String[] failedStorageLocations,
|
||||
long lastVolumeFailureDate, long estimatedCapacityLostTotal) {
|
||||
this.failedStorageLocations = failedStorageLocations;
|
||||
this.lastVolumeFailureDate = lastVolumeFailureDate;
|
||||
this.estimatedCapacityLostTotal = estimatedCapacityLostTotal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns each storage location that has failed, sorted.
|
||||
*
|
||||
* @return each storage location that has failed, sorted
|
||||
*/
|
||||
public String[] getFailedStorageLocations() {
|
||||
return this.failedStorageLocations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the date/time of the last volume failure in milliseconds since
|
||||
* epoch.
|
||||
*
|
||||
* @return date/time of last volume failure in milliseconds since epoch
|
||||
*/
|
||||
public long getLastVolumeFailureDate() {
|
||||
return this.lastVolumeFailureDate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns estimate of capacity lost. This is said to be an estimate, because
|
||||
* in some cases it's impossible to know the capacity of the volume, such as if
|
||||
* we never had a chance to query its capacity before the failure occurred.
|
||||
*
|
||||
* @return estimate of capacity lost in bytes
|
||||
*/
|
||||
public long getEstimatedCapacityLostTotal() {
|
||||
return this.estimatedCapacityLostTotal;
|
||||
}
|
||||
}
|
|
@ -160,6 +160,17 @@ message RegisterDatanodeResponseProto {
|
|||
required DatanodeRegistrationProto registration = 1; // Datanode info
|
||||
}
|
||||
|
||||
/**
|
||||
* failedStorageLocations - storage locations that have failed
|
||||
* lastVolumeFailureDate - date/time of last volume failure
|
||||
* estimatedCapacityLost - estimate of total capacity lost due to volume failures
|
||||
*/
|
||||
message VolumeFailureSummaryProto {
|
||||
repeated string failedStorageLocations = 1;
|
||||
required uint64 lastVolumeFailureDate = 2;
|
||||
required uint64 estimatedCapacityLostTotal = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
* registration - datanode registration information
|
||||
* capacity - total storage capacity available at the datanode
|
||||
|
@ -168,9 +179,12 @@ message RegisterDatanodeResponseProto {
|
|||
* blockPoolUsed - storage used by the block pool
|
||||
* xmitsInProgress - number of transfers from this datanode to others
|
||||
* xceiverCount - number of active transceiver threads
|
||||
* failedVolumes - number of failed volumes
|
||||
* failedVolumes - number of failed volumes. This is redundant with the
|
||||
* information included in volumeFailureSummary, but the field is retained
|
||||
* for backwards compatibility.
|
||||
* cacheCapacity - total cache capacity available at the datanode
|
||||
* cacheUsed - amount of cache used
|
||||
* volumeFailureSummary - info about volume failures
|
||||
*/
|
||||
message HeartbeatRequestProto {
|
||||
required DatanodeRegistrationProto registration = 1; // Datanode info
|
||||
|
@ -180,6 +194,7 @@ message HeartbeatRequestProto {
|
|||
optional uint32 failedVolumes = 5 [ default = 0 ];
|
||||
optional uint64 cacheCapacity = 6 [ default = 0 ];
|
||||
optional uint64 cacheUsed = 7 [default = 0 ];
|
||||
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
<ul class="nav navbar-nav" id="ui-tabs">
|
||||
<li><a href="#tab-overview">Overview</a></li>
|
||||
<li><a href="#tab-datanode">Datanodes</a></li>
|
||||
<li><a href="#tab-datanode-volume-failures">Datanode Volume Failures</a></li>
|
||||
<li><a href="#tab-snapshot">Snapshot</a></li>
|
||||
<li><a href="#tab-startup-progress">Startup Progress</a></li>
|
||||
<li class="dropdown">
|
||||
|
@ -59,6 +60,7 @@
|
|||
<div class="tab-content">
|
||||
<div class="tab-pane" id="tab-overview"></div>
|
||||
<div class="tab-pane" id="tab-datanode"></div>
|
||||
<div class="tab-pane" id="tab-datanode-volume-failures"></div>
|
||||
<div class="tab-pane" id="tab-snapshot"></div>
|
||||
<div class="tab-pane" id="tab-startup-progress"></div>
|
||||
</div>
|
||||
|
@ -165,6 +167,7 @@
|
|||
<tr><th><a href="#tab-datanode">Live Nodes</a></th><td>{NumLiveDataNodes} (Decommissioned: {NumDecomLiveDataNodes})</td></tr>
|
||||
<tr><th><a href="#tab-datanode">Dead Nodes</a></th><td>{NumDeadDataNodes} (Decommissioned: {NumDecomDeadDataNodes})</td></tr>
|
||||
<tr><th><a href="#tab-datanode">Decommissioning Nodes</a></th><td>{NumDecommissioningDataNodes}</td></tr>
|
||||
<tr><th><a href="#tab-datanode-volume-failures">Total Datanode Volume Failures</a></th><td>{VolumeFailuresTotal} ({EstimatedCapacityLostTotal|fmt_bytes})</td></tr>
|
||||
<tr><th title="Excludes missing blocks.">Number of Under-Replicated Blocks</th><td>{UnderReplicatedBlocks}</td></tr>
|
||||
<tr><th>Number of Blocks Pending Deletion</th><td>{PendingDeletionBlocks}</td></tr>
|
||||
<tr><th>Block Deletion Start Time</th><td>{BlockDeletionStartTime|date_tostring}</td></tr>
|
||||
|
@ -324,6 +327,36 @@
|
|||
</small>
|
||||
</script>
|
||||
|
||||
<script type="text/x-dust-template" id="tmpl-datanode-volume-failures">
|
||||
<div class="page-header"><h1>Datanode Volume Failures</h1></div>
|
||||
<small>
|
||||
{?LiveNodes}
|
||||
<table class="table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Node</th>
|
||||
<th>Last Failure Date</th>
|
||||
<th>Failed Volumes</th>
|
||||
<th>Estimated Capacity Lost</th>
|
||||
<th>Failed Storage Locations</th>
|
||||
</tr>
|
||||
</thead>
|
||||
{#LiveNodes}
|
||||
<tr>
|
||||
<td>{name} ({xferaddr})</td>
|
||||
<td>{#helper_date_tostring value="{lastVolumeFailureDate}"/}</td>
|
||||
<td>{volfails}</td>
|
||||
<td>{estimatedCapacityLostTotal|fmt_bytes}</td>
|
||||
<td>{#failedStorageLocations}{.}{@sep}<br />{/sep}{/failedStorageLocations}</td>
|
||||
</tr>
|
||||
{/LiveNodes}
|
||||
</table>
|
||||
{:else}
|
||||
There are no reported volume failures.
|
||||
{/LiveNodes}
|
||||
</small>
|
||||
</script>
|
||||
|
||||
<script type="text/x-dust-template" id="tmpl-startup-progress">
|
||||
<div class="page-header"><h1>Startup Progress</h1></div>
|
||||
<p>Elapsed Time: {elapsedTime|fmt_time}, Percent Complete: {percentComplete|fmt_percentage}</p>
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
dust.loadSource(dust.compile($('#tmpl-dfshealth').html(), 'dfshealth'));
|
||||
dust.loadSource(dust.compile($('#tmpl-startup-progress').html(), 'startup-progress'));
|
||||
dust.loadSource(dust.compile($('#tmpl-datanode').html(), 'datanode-info'));
|
||||
dust.loadSource(dust.compile($('#tmpl-datanode-volume-failures').html(), 'datanode-volume-failures'));
|
||||
dust.loadSource(dust.compile($('#tmpl-snapshot').html(), 'snapshot-info'));
|
||||
|
||||
function load_overview() {
|
||||
|
@ -193,6 +194,45 @@
|
|||
})).error(ajax_error_handler);
|
||||
}
|
||||
|
||||
function load_datanode_volume_failures() {
|
||||
|
||||
var HELPERS = {
|
||||
'helper_date_tostring' : function (chunk, ctx, bodies, params) {
|
||||
var value = dust.helpers.tap(params.value, chunk, ctx);
|
||||
return chunk.write('' + new Date(Number(value)).toLocaleString());
|
||||
}
|
||||
};
|
||||
|
||||
function workaround(r) {
|
||||
function node_map_to_array(nodes) {
|
||||
var res = [];
|
||||
for (var n in nodes) {
|
||||
var p = nodes[n];
|
||||
// Filter the display to only datanodes with volume failures.
|
||||
if (p.volfails > 0) {
|
||||
p.name = n;
|
||||
res.push(p);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
r.LiveNodes = node_map_to_array(JSON.parse(r.LiveNodes));
|
||||
return r;
|
||||
}
|
||||
|
||||
$.get(
|
||||
'/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo',
|
||||
guard_with_startup_progress(function (resp) {
|
||||
var data = workaround(resp.beans[0]);
|
||||
var base = dust.makeBase(HELPERS);
|
||||
dust.render('datanode-volume-failures', base.push(data), function(err, out) {
|
||||
$('#tab-datanode-volume-failures').html(out);
|
||||
$('#ui-tabs a[href="#tab-datanode-volume-failures"]').tab('show');
|
||||
});
|
||||
})).error(ajax_error_handler);
|
||||
}
|
||||
|
||||
function load_snapshot_info() {
|
||||
$.get(
|
||||
'/jmx?qry=Hadoop:service=NameNode,name=SnapshotInfo',
|
||||
|
@ -210,6 +250,9 @@
|
|||
case "#tab-datanode":
|
||||
load_datanode_info();
|
||||
break;
|
||||
case "#tab-datanode-volume-failures":
|
||||
load_datanode_volume_failures();
|
||||
break;
|
||||
case "#tab-snapshot":
|
||||
load_snapshot_info();
|
||||
break;
|
||||
|
|
|
@ -114,7 +114,8 @@ public class TestBlockManager {
|
|||
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
|
||||
dn.updateHeartbeat(
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0);
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0,
|
||||
null);
|
||||
bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.math3.stat.inference.TestUtils;
|
||||
|
@ -87,7 +86,8 @@ public class TestNameNodePrunesMissingStorages {
|
|||
|
||||
// Stop the DataNode and send fake heartbeat with missing storage.
|
||||
cluster.stopDataNode(0);
|
||||
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, 0);
|
||||
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
|
||||
0, null);
|
||||
|
||||
// Check that the missing storage was pruned.
|
||||
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));
|
||||
|
|
|
@ -107,7 +107,7 @@ public class TestOverReplicatedBlocks {
|
|||
datanode.getStorageInfos()[0].setUtilizationForTesting(100L, 100L, 0, 100L);
|
||||
datanode.updateHeartbeat(
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
|
||||
0L, 0L, 0, 0);
|
||||
0L, 0L, 0, 0, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ public class TestReplicationPolicy {
|
|||
capacity, dfsUsed, remaining, blockPoolUsed);
|
||||
dn.updateHeartbeat(
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
|
||||
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
|
||||
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -97,7 +97,7 @@ public class TestReplicationPolicyConsiderLoad {
|
|||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L);
|
||||
dataNodes[i].updateHeartbeat(
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]),
|
||||
0L, 0L, 0, 0);
|
||||
0L, 0L, 0, 0, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,17 +115,17 @@ public class TestReplicationPolicyConsiderLoad {
|
|||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
|
||||
blockPoolId, dataNodes[3].getCacheCapacity(),
|
||||
dataNodes[3].getCacheRemaining(),
|
||||
2, 0, 0);
|
||||
2, 0, 0, null);
|
||||
dnManager.handleHeartbeat(dnrList.get(4),
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
|
||||
blockPoolId, dataNodes[4].getCacheCapacity(),
|
||||
dataNodes[4].getCacheRemaining(),
|
||||
4, 0, 0);
|
||||
4, 0, 0, null);
|
||||
dnManager.handleHeartbeat(dnrList.get(5),
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
|
||||
blockPoolId, dataNodes[5].getCacheCapacity(),
|
||||
dataNodes[5].getCacheRemaining(),
|
||||
4, 0, 0);
|
||||
4, 0, 0, null);
|
||||
// value in the above heartbeats
|
||||
final int load = 2 + 4 + 4;
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
|||
capacity, dfsUsed, remaining, blockPoolUsed);
|
||||
dn.updateHeartbeat(
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
|
||||
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
|
||||
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
||||
}
|
||||
|
||||
private static void setupDataNodeCapacity() {
|
||||
|
|
|
@ -35,6 +35,7 @@ import javax.management.NotCompliantMBeanException;
|
|||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
|
@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -627,6 +629,26 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
return storage.getNumFailedVolumes();
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public String[] getFailedStorageLocations() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getLastVolumeFailureDate() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getEstimatedCapacityLostTotal() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public VolumeFailureSummary getVolumeFailureSummary() {
|
||||
return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0);
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getCacheUsed() {
|
||||
return 0l;
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -137,7 +138,8 @@ public class TestBPOfferService {
|
|||
Mockito.anyLong(),
|
||||
Mockito.anyInt(),
|
||||
Mockito.anyInt(),
|
||||
Mockito.anyInt());
|
||||
Mockito.anyInt(),
|
||||
Mockito.any(VolumeFailureSummary.class));
|
||||
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
|
||||
return mock;
|
||||
}
|
||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -160,7 +161,8 @@ public class TestBlockRecovery {
|
|||
Mockito.anyLong(),
|
||||
Mockito.anyInt(),
|
||||
Mockito.anyInt(),
|
||||
Mockito.anyInt()))
|
||||
Mockito.anyInt(),
|
||||
Mockito.any(VolumeFailureSummary.class)))
|
||||
.thenReturn(new HeartbeatResponse(
|
||||
new DatanodeCommand[0],
|
||||
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
|
@ -30,6 +32,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.ReconfigurationException;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -39,6 +42,10 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -58,6 +65,7 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
private MiniDFSCluster cluster;
|
||||
private Configuration conf;
|
||||
private String dataDir;
|
||||
private long volumeCapacity;
|
||||
|
||||
// Sleep at least 3 seconds (a 1s heartbeat plus padding) to allow
|
||||
// for heartbeats to propagate from the datanodes to the namenode.
|
||||
|
@ -69,29 +77,29 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
|
||||
/*
|
||||
* Lower the DN heartbeat, DF rate, and recheck interval to one second
|
||||
* so state about failures and datanode death propagates faster.
|
||||
*/
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
// These tests simulate volume failures by denying execute permission on the
|
||||
// volume's path. On Windows, the owner of an object is always allowed
|
||||
// access, so we can't run these tests on Windows.
|
||||
assumeTrue(!Path.WINDOWS);
|
||||
// Allow a single volume failure (there are two volumes)
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
dataDir = cluster.getDataDirectory();
|
||||
initCluster(1, 2, 1);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+1)), true);
|
||||
FileUtil.setExecutable(new File(dataDir, "data"+(2*i+2)), true);
|
||||
// Restore executable permission on all directories where a failure may have
|
||||
// been simulated by denying execute access. This is based on the maximum
|
||||
// number of datanodes and the maximum number of storages per data node used
|
||||
// throughout the tests in this suite.
|
||||
int maxDataNodes = 3;
|
||||
int maxStoragesPerDataNode = 4;
|
||||
for (int i = 0; i < maxDataNodes; i++) {
|
||||
for (int j = 1; j <= maxStoragesPerDataNode; j++) {
|
||||
String subDir = "data" + ((i * maxStoragesPerDataNode) + j);
|
||||
FileUtil.setExecutable(new File(dataDir, subDir), true);
|
||||
}
|
||||
}
|
||||
IOUtils.cleanup(LOG, fs);
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
|
@ -102,8 +110,6 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
*/
|
||||
@Test
|
||||
public void testSuccessiveVolumeFailures() throws Exception {
|
||||
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
|
||||
|
||||
// Bring up two more datanodes
|
||||
cluster.startDataNodes(conf, 2, true, null, null);
|
||||
cluster.waitActive();
|
||||
|
@ -151,12 +157,9 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
/*
|
||||
* The metrics should confirm the volume failures.
|
||||
*/
|
||||
assertCounter("VolumeFailures", 1L,
|
||||
getMetrics(dns.get(0).getMetrics().name()));
|
||||
assertCounter("VolumeFailures", 1L,
|
||||
getMetrics(dns.get(1).getMetrics().name()));
|
||||
assertCounter("VolumeFailures", 0L,
|
||||
getMetrics(dns.get(2).getMetrics().name()));
|
||||
checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(2), 0, true);
|
||||
|
||||
// Ensure we wait a sufficient amount of time
|
||||
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
|
||||
|
@ -164,6 +167,10 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
// Eventually the NN should report two volume failures
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(2), true);
|
||||
|
||||
/*
|
||||
* Now fail a volume on the third datanode. We should be able to get
|
||||
|
@ -174,17 +181,10 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
|
||||
DFSTestUtil.waitReplication(fs, file2, (short)3);
|
||||
assertTrue("DN3 should still be up", dns.get(2).isDatanodeUp());
|
||||
assertCounter("VolumeFailures", 1L,
|
||||
getMetrics(dns.get(2).getMetrics().name()));
|
||||
checkFailuresAtDataNode(dns.get(2), 1, true, dn3Vol1.getAbsolutePath());
|
||||
|
||||
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
||||
dm.fetchDatanodes(live, dead, false);
|
||||
live.clear();
|
||||
dead.clear();
|
||||
dm.fetchDatanodes(live, dead, false);
|
||||
assertEquals("DN3 should have 1 failed volume",
|
||||
1, live.get(2).getVolumeFailures());
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(2));
|
||||
checkFailuresAtNameNode(dm, dns.get(2), true, dn3Vol1.getAbsolutePath());
|
||||
|
||||
/*
|
||||
* Once the datanodes have a chance to heartbeat their new capacity the
|
||||
|
@ -194,6 +194,10 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3,
|
||||
origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 3);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(2), true, dn3Vol1.getAbsolutePath());
|
||||
|
||||
/*
|
||||
* Now fail the 2nd volume on the 3rd datanode. All its volumes
|
||||
|
@ -210,12 +214,15 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
DFSTestUtil.waitForDatanodeDeath(dns.get(2));
|
||||
|
||||
// And report two failed volumes
|
||||
assertCounter("VolumeFailures", 2L,
|
||||
getMetrics(dns.get(2).getMetrics().name()));
|
||||
checkFailuresAtDataNode(dns.get(2), 2, true, dn3Vol1.getAbsolutePath(),
|
||||
dn3Vol2.getAbsolutePath());
|
||||
|
||||
// The NN considers the DN dead
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2,
|
||||
origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
|
||||
/*
|
||||
* The datanode never tries to restore the failed volume, even if
|
||||
|
@ -240,6 +247,11 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
*/
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity,
|
||||
WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 0);
|
||||
dns = cluster.getDataNodes();
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true);
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true);
|
||||
checkFailuresAtNameNode(dm, dns.get(2), true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -247,8 +259,6 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
*/
|
||||
@Test
|
||||
public void testVolFailureStatsPreservedOnNNRestart() throws Exception {
|
||||
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
|
||||
|
||||
// Bring up two more datanodes that can tolerate 1 failure
|
||||
cluster.startDataNodes(conf, 2, true, null, null);
|
||||
cluster.waitActive();
|
||||
|
@ -268,15 +278,346 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
Path file1 = new Path("/test1");
|
||||
DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
|
||||
DFSTestUtil.waitReplication(fs, file1, (short)2);
|
||||
ArrayList<DataNode> dns = cluster.getDataNodes();
|
||||
|
||||
// The NN reports two volumes failures
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// After restarting the NN it still see the two failures
|
||||
cluster.restartNameNode(0);
|
||||
cluster.waitActive();
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleVolFailuresOnNode() throws Exception {
|
||||
// Reinitialize the cluster, configured with 4 storage locations per DataNode
|
||||
// and tolerating up to 2 failures.
|
||||
tearDown();
|
||||
initCluster(3, 4, 2);
|
||||
|
||||
// Calculate the total capacity of all the datanodes. Sleep for three seconds
|
||||
// to be sure the datanodes have had a chance to heartbeat their capacities.
|
||||
Thread.sleep(WAIT_FOR_HEARTBEATS);
|
||||
DatanodeManager dm = cluster.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager();
|
||||
|
||||
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
|
||||
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
|
||||
|
||||
File dn1Vol1 = new File(dataDir, "data"+(4*0+1));
|
||||
File dn1Vol2 = new File(dataDir, "data"+(4*0+2));
|
||||
File dn2Vol1 = new File(dataDir, "data"+(4*1+1));
|
||||
File dn2Vol2 = new File(dataDir, "data"+(4*1+2));
|
||||
|
||||
// Make the first two volume directories on the first two datanodes
|
||||
// non-accessible.
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1,
|
||||
false));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol2,
|
||||
false));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1,
|
||||
false));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol2,
|
||||
false));
|
||||
|
||||
// Create file1 and wait for 3 replicas (ie all DNs can still store a block).
|
||||
// Then assert that all DNs are up, despite the volume failures.
|
||||
Path file1 = new Path("/test1");
|
||||
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
|
||||
DFSTestUtil.waitReplication(fs, file1, (short)3);
|
||||
|
||||
ArrayList<DataNode> dns = cluster.getDataNodes();
|
||||
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
|
||||
assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
|
||||
assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
|
||||
|
||||
checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath(),
|
||||
dn1Vol2.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath(),
|
||||
dn2Vol2.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(2), 0, true);
|
||||
|
||||
// Ensure we wait a sufficient amount of time
|
||||
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
|
||||
|
||||
// Eventually the NN should report four volume failures
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 4,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 4);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath(),
|
||||
dn1Vol2.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath(),
|
||||
dn2Vol2.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(2), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataNodeReconfigureWithVolumeFailures() throws Exception {
|
||||
// Bring up two more datanodes
|
||||
cluster.startDataNodes(conf, 2, true, null, null);
|
||||
cluster.waitActive();
|
||||
|
||||
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
|
||||
).getDatanodeManager();
|
||||
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
|
||||
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
|
||||
|
||||
// Fail the first volume on both datanodes (we have to keep the
|
||||
// third healthy so one node in the pipeline will not fail).
|
||||
File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
|
||||
File dn1Vol2 = new File(dataDir, "data"+(2*0+2));
|
||||
File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
|
||||
File dn2Vol2 = new File(dataDir, "data"+(2*1+2));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, false));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, false));
|
||||
|
||||
Path file1 = new Path("/test1");
|
||||
DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
|
||||
DFSTestUtil.waitReplication(fs, file1, (short)2);
|
||||
|
||||
ArrayList<DataNode> dns = cluster.getDataNodes();
|
||||
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
|
||||
assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
|
||||
assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
|
||||
|
||||
checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(2), 0, true);
|
||||
|
||||
// Ensure we wait a sufficient amount of time
|
||||
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
|
||||
|
||||
// The NN reports two volume failures
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// Reconfigure each DataNode to remove its failed volumes.
|
||||
reconfigureDataNode(dns.get(0), dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
|
||||
checkFailuresAtDataNode(dns.get(0), 1, true);
|
||||
checkFailuresAtDataNode(dns.get(1), 1, true);
|
||||
|
||||
// NN sees reduced capacity, but no volume failures.
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 0);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true);
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true);
|
||||
|
||||
// Reconfigure again to try to add back the failed volumes.
|
||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
|
||||
checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// Ensure we wait a sufficient amount of time.
|
||||
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
|
||||
|
||||
// The NN reports two volume failures again.
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(false, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// Reconfigure a third time with the failed volumes. Afterwards, we expect
|
||||
// the same volume failures to be reported. (No double-counting.)
|
||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
|
||||
checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// Ensure we wait a sufficient amount of time.
|
||||
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
|
||||
|
||||
// The NN reports two volume failures again.
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(false, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the NameNode for correct values of aggregate counters tracking failed
|
||||
* volumes across all DataNodes.
|
||||
*
|
||||
* @param expectCapacityKnown if true, then expect that the capacities of the
|
||||
* volumes were known before the failures, and therefore the lost capacity
|
||||
* can be reported
|
||||
* @param expectedVolumeFailuresTotal expected number of failed volumes
|
||||
*/
|
||||
private void checkAggregateFailuresAtNameNode(boolean expectCapacityKnown,
|
||||
int expectedVolumeFailuresTotal) {
|
||||
FSNamesystem ns = cluster.getNamesystem();
|
||||
assertEquals(expectedVolumeFailuresTotal, ns.getVolumeFailuresTotal());
|
||||
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
|
||||
expectedVolumeFailuresTotal);
|
||||
assertEquals(expectedCapacityLost, ns.getEstimatedCapacityLostTotal());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks a DataNode for correct reporting of failed volumes.
|
||||
*
|
||||
* @param dn DataNode to check
|
||||
* @param expectedVolumeFailuresCounter metric counter value for
|
||||
* VolumeFailures. The current implementation actually counts the number
|
||||
* of failed disk checker cycles, which may be different from the length of
|
||||
* expectedFailedVolumes if multiple disks fail in the same disk checker
|
||||
* cycle
|
||||
* @param expectCapacityKnown if true, then expect that the capacities of the
|
||||
* volumes were known before the failures, and therefore the lost capacity
|
||||
* can be reported
|
||||
* @param expectedFailedVolumes expected locations of failed volumes
|
||||
* @throws Exception if there is any failure
|
||||
*/
|
||||
private void checkFailuresAtDataNode(DataNode dn,
|
||||
long expectedVolumeFailuresCounter, boolean expectCapacityKnown,
|
||||
String... expectedFailedVolumes) throws Exception {
|
||||
assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
|
||||
getMetrics(dn.getMetrics().name()));
|
||||
FsDatasetSpi<?> fsd = dn.getFSDataset();
|
||||
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
|
||||
assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());
|
||||
if (expectedFailedVolumes.length > 0) {
|
||||
assertTrue(fsd.getLastVolumeFailureDate() > 0);
|
||||
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
|
||||
expectedFailedVolumes.length);
|
||||
assertEquals(expectedCapacityLost, fsd.getEstimatedCapacityLostTotal());
|
||||
} else {
|
||||
assertEquals(0, fsd.getLastVolumeFailureDate());
|
||||
assertEquals(0, fsd.getEstimatedCapacityLostTotal());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks NameNode tracking of a particular DataNode for correct reporting of
|
||||
* failed volumes.
|
||||
*
|
||||
* @param dm DatanodeManager to check
|
||||
* @param dn DataNode to check
|
||||
* @param expectCapacityKnown if true, then expect that the capacities of the
|
||||
* volumes were known before the failures, and therefore the lost capacity
|
||||
* can be reported
|
||||
* @param expectedFailedVolumes expected locations of failed volumes
|
||||
* @throws Exception if there is any failure
|
||||
*/
|
||||
private void checkFailuresAtNameNode(DatanodeManager dm, DataNode dn,
|
||||
boolean expectCapacityKnown, String... expectedFailedVolumes)
|
||||
throws Exception {
|
||||
DatanodeDescriptor dd = cluster.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId());
|
||||
assertEquals(expectedFailedVolumes.length, dd.getVolumeFailures());
|
||||
VolumeFailureSummary volumeFailureSummary = dd.getVolumeFailureSummary();
|
||||
if (expectedFailedVolumes.length > 0) {
|
||||
assertArrayEquals(expectedFailedVolumes, volumeFailureSummary
|
||||
.getFailedStorageLocations());
|
||||
assertTrue(volumeFailureSummary.getLastVolumeFailureDate() > 0);
|
||||
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
|
||||
expectedFailedVolumes.length);
|
||||
assertEquals(expectedCapacityLost,
|
||||
volumeFailureSummary.getEstimatedCapacityLostTotal());
|
||||
} else {
|
||||
assertNull(volumeFailureSummary);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns expected capacity lost for use in assertions. The return value is
|
||||
* dependent on whether or not it is expected that the volume capacities were
|
||||
* known prior to the failures.
|
||||
*
|
||||
* @param expectCapacityKnown if true, then expect that the capacities of the
|
||||
* volumes were known before the failures, and therefore the lost capacity
|
||||
* can be reported
|
||||
* @param expectedVolumeFailuresTotal expected number of failed volumes
|
||||
* @return estimated capacity lost in bytes
|
||||
*/
|
||||
private long getExpectedCapacityLost(boolean expectCapacityKnown,
|
||||
int expectedVolumeFailuresTotal) {
|
||||
return expectCapacityKnown ? expectedVolumeFailuresTotal * volumeCapacity :
|
||||
0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the cluster.
|
||||
*
|
||||
* @param numDataNodes number of datanodes
|
||||
* @param storagesPerDatanode number of storage locations on each datanode
|
||||
* @param failedVolumesTolerated number of acceptable volume failures
|
||||
* @throws Exception if there is any failure
|
||||
*/
|
||||
private void initCluster(int numDataNodes, int storagesPerDatanode,
|
||||
int failedVolumesTolerated) throws Exception {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
|
||||
/*
|
||||
* Lower the DN heartbeat, DF rate, and recheck interval to one second
|
||||
* so state about failures and datanode death propagates faster.
|
||||
*/
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
|
||||
failedVolumesTolerated);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
|
||||
.storagesPerDatanode(storagesPerDatanode).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
dataDir = cluster.getDataDirectory();
|
||||
long dnCapacity = DFSTestUtil.getDatanodeCapacity(
|
||||
cluster.getNamesystem().getBlockManager().getDatanodeManager(), 0);
|
||||
volumeCapacity = dnCapacity / cluster.getStoragesPerDatanode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconfigure a DataNode by setting a new list of volumes.
|
||||
*
|
||||
* @param dn DataNode to reconfigure
|
||||
* @param newVols new volumes to configure
|
||||
* @throws Exception if there is any failure
|
||||
*/
|
||||
private static void reconfigureDataNode(DataNode dn, File... newVols)
|
||||
throws Exception {
|
||||
StringBuilder dnNewDataDirs = new StringBuilder();
|
||||
for (File newVol: newVols) {
|
||||
if (dnNewDataDirs.length() > 0) {
|
||||
dnNewDataDirs.append(',');
|
||||
}
|
||||
dnNewDataDirs.append(newVol.getAbsolutePath());
|
||||
}
|
||||
try {
|
||||
dn.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||
dnNewDataDirs.toString());
|
||||
} catch (ReconfigurationException e) {
|
||||
// This can be thrown if reconfiguration tries to use a failed volume.
|
||||
// We need to swallow the exception, because some of our tests want to
|
||||
// cover this case.
|
||||
LOG.warn("Could not reconfigure DataNode.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
||||
|
@ -162,7 +163,7 @@ public class TestFsDatasetCache {
|
|||
doReturn(response).when(spyNN).sendHeartbeat(
|
||||
(DatanodeRegistration) any(),
|
||||
(StorageReport[]) any(), anyLong(), anyLong(),
|
||||
anyInt(), anyInt(), anyInt());
|
||||
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any());
|
||||
}
|
||||
|
||||
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -101,7 +102,8 @@ public class TestStorageReport {
|
|||
Mockito.verify(nnSpy).sendHeartbeat(
|
||||
any(DatanodeRegistration.class),
|
||||
captor.capture(),
|
||||
anyLong(), anyLong(), anyInt(), anyInt(), anyInt());
|
||||
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
|
||||
Mockito.any(VolumeFailureSummary.class));
|
||||
|
||||
StorageReport[] reports = captor.getValue();
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
|
@ -375,6 +376,26 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getFailedStorageLocations() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastVolumeFailureDate() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEstimatedCapacityLostTotal() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VolumeFailureSummary getVolumeFailureSummary() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCacheUsed() {
|
||||
return 0;
|
||||
|
|
|
@ -47,6 +47,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -102,6 +103,7 @@ public class TestFsDatasetImpl {
|
|||
|
||||
String dataDir = StringUtils.join(",", dirStrings);
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
|
||||
when(storage.dirIterator()).thenReturn(dirs.iterator());
|
||||
when(storage.getNumStorageDirs()).thenReturn(numDirs);
|
||||
}
|
||||
|
||||
|
@ -240,8 +242,8 @@ public class TestFsDatasetImpl {
|
|||
RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
|
||||
new RoundRobinVolumeChoosingPolicy<>();
|
||||
final BlockScanner blockScanner = new BlockScanner(datanode, conf);
|
||||
final FsVolumeList volumeList =
|
||||
new FsVolumeList(0, blockScanner, blockChooser);
|
||||
final FsVolumeList volumeList = new FsVolumeList(
|
||||
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
|
||||
final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
|
||||
|
||||
// Initialize FsVolumeList with 5 mock volumes.
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.junit.Test;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
@ -57,7 +58,8 @@ public class TestFsVolumeList {
|
|||
|
||||
@Test
|
||||
public void testGetNextVolumeWithClosedVolume() throws IOException {
|
||||
FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
|
||||
FsVolumeList volumeList = new FsVolumeList(
|
||||
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
|
||||
List<FsVolumeImpl> volumes = new ArrayList<>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
File curDir = new File(baseDir, "nextvolume-" + i);
|
||||
|
@ -82,7 +84,8 @@ public class TestFsVolumeList {
|
|||
|
||||
@Test
|
||||
public void testCheckDirsWithClosedVolume() throws IOException {
|
||||
FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
|
||||
FsVolumeList volumeList = new FsVolumeList(
|
||||
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
|
||||
List<FsVolumeImpl> volumes = new ArrayList<>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
File curDir = new File(baseDir, "volume-" + i);
|
||||
|
|
|
@ -951,7 +951,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
StorageReport[] rep = { new StorageReport(storage, false,
|
||||
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
|
||||
0L, 0L, 0, 0, 0).getCommands();
|
||||
0L, 0L, 0, 0, 0, null).getCommands();
|
||||
if(cmds != null) {
|
||||
for (DatanodeCommand cmd : cmds ) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
|
@ -998,7 +998,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
StorageReport[] rep = { new StorageReport(storage,
|
||||
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
||||
rep, 0L, 0L, 0, 0, 0).getCommands();
|
||||
rep, 0L, 0L, 0, 0, 0, null).getCommands();
|
||||
if (cmds != null) {
|
||||
for (DatanodeCommand cmd : cmds) {
|
||||
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
|
||||
|
|
|
@ -117,7 +117,7 @@ public class NameNodeAdapter {
|
|||
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
|
||||
return namesystem.handleHeartbeat(nodeReg,
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
|
||||
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0);
|
||||
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null);
|
||||
}
|
||||
|
||||
public static boolean setReplication(final FSNamesystem ns,
|
||||
|
|
|
@ -117,7 +117,7 @@ public class TestDeadDatanode {
|
|||
StorageReport[] rep = { new StorageReport(
|
||||
new DatanodeStorage(reg.getDatanodeUuid()),
|
||||
false, 0, 0, 0, 0) };
|
||||
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0)
|
||||
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null)
|
||||
.getCommands();
|
||||
assertEquals(1, cmd.length);
|
||||
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
|
||||
|
|
Loading…
Reference in New Issue