HDFS-10959. Update DataNode to use DatasetVolumeChecker. Contributed by Xiaoyu Yao.
This commit is contained in:
parent
2d6be7ea23
commit
413dccaf78
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link FileIoEvents} that profiles the performance of the metadata and data
|
||||||
|
* related operations on datanode volumes.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class ProfilingFileIoEvents implements FileIoEvents {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long beforeMetadataOp(@Nullable FsVolumeSpi volume,
|
||||||
|
FileIoProvider.OPERATION op) {
|
||||||
|
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||||
|
if (metrics != null) {
|
||||||
|
return Time.monotonicNow();
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterMetadataOp(@Nullable FsVolumeSpi volume,
|
||||||
|
FileIoProvider.OPERATION op, long begin) {
|
||||||
|
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||||
|
if (metrics != null) {
|
||||||
|
metrics.addMetadastaOperationLatency(Time.monotonicNow() - begin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long beforeFileIo(@Nullable FsVolumeSpi volume,
|
||||||
|
FileIoProvider.OPERATION op, long len) {
|
||||||
|
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||||
|
if (metrics != null) {
|
||||||
|
return Time.monotonicNow();
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterFileIo(@Nullable FsVolumeSpi volume,
|
||||||
|
FileIoProvider.OPERATION op, long begin, long len) {
|
||||||
|
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||||
|
if (metrics != null) {
|
||||||
|
long latency = Time.monotonicNow() - begin;
|
||||||
|
metrics.addDataFileIoLatency(latency);
|
||||||
|
switch (op) {
|
||||||
|
case SYNC:
|
||||||
|
metrics.addSyncIoLatency(latency);
|
||||||
|
break;
|
||||||
|
case FLUSH:
|
||||||
|
metrics.addFlushIoLatency(latency);
|
||||||
|
break;
|
||||||
|
case READ:
|
||||||
|
metrics.addReadIoLatency(latency);
|
||||||
|
break;
|
||||||
|
case WRITE:
|
||||||
|
metrics.addWriteIoLatency(latency);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@Nullable FsVolumeSpi volume,
|
||||||
|
FileIoProvider.OPERATION op, Exception e, long begin) {
|
||||||
|
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||||
|
if (metrics != null) {
|
||||||
|
metrics.addFileIoError(Time.monotonicNow() - begin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public String getStatistics() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private DataNodeVolumeMetrics getVolumeMetrics(final FsVolumeSpi volume) {
|
||||||
|
if (volume != null) {
|
||||||
|
return volume.getMetrics();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,289 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is for maintaining Datanode Volume IO related statistics and
|
||||||
|
* publishing them through the metrics interfaces.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
@Metrics(name = "DataNodeVolume", about = "DataNode Volume metrics",
|
||||||
|
context = "dfs")
|
||||||
|
public class DataNodeVolumeMetrics {
|
||||||
|
private final MetricsRegistry registry = new MetricsRegistry("FsVolume");
|
||||||
|
|
||||||
|
@Metric("number of metadata operations")
|
||||||
|
private MutableCounterLong totalMetadataOperations;
|
||||||
|
@Metric("metadata operation rate")
|
||||||
|
private MutableRate metadataOperationRate;
|
||||||
|
private MutableQuantiles[] metadataOperationLatencyQuantiles;
|
||||||
|
|
||||||
|
@Metric("number of data file io operations")
|
||||||
|
private MutableCounterLong totalDataFileIos;
|
||||||
|
@Metric("data file io operation rate")
|
||||||
|
private MutableRate dataFileIoRate;
|
||||||
|
private MutableQuantiles[] dataFileIoLatencyQuantiles;
|
||||||
|
|
||||||
|
@Metric("file io flush rate")
|
||||||
|
private MutableRate flushIoRate;
|
||||||
|
private MutableQuantiles[] flushIoLatencyQuantiles;
|
||||||
|
|
||||||
|
@Metric("file io sync rate")
|
||||||
|
private MutableRate syncIoRate;
|
||||||
|
private MutableQuantiles[] syncIoLatencyQuantiles;
|
||||||
|
|
||||||
|
@Metric("file io read rate")
|
||||||
|
private MutableRate readIoRate;
|
||||||
|
private MutableQuantiles[] readIoLatencyQuantiles;
|
||||||
|
|
||||||
|
@Metric("file io write rate")
|
||||||
|
private MutableRate writeIoRate;
|
||||||
|
private MutableQuantiles[] writeIoLatencyQuantiles;
|
||||||
|
|
||||||
|
@Metric("number of file io errors")
|
||||||
|
private MutableCounterLong totalFileIoErrors;
|
||||||
|
@Metric("file io error rate")
|
||||||
|
private MutableRate fileIoErrorRate;
|
||||||
|
|
||||||
|
public long getTotalMetadataOperations() {
|
||||||
|
return totalMetadataOperations.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on metadataOperationRate
|
||||||
|
public long getMetadataOperationSampleCount() {
|
||||||
|
return metadataOperationRate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMetadataOperationMean() {
|
||||||
|
return metadataOperationRate.lastStat().mean();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMetadataOperationStdDev() {
|
||||||
|
return metadataOperationRate.lastStat().stddev();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotalDataFileIos() {
|
||||||
|
return totalDataFileIos.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on dataFileIoRate
|
||||||
|
public long getDataFileIoSampleCount() {
|
||||||
|
return dataFileIoRate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getDataFileIoMean() {
|
||||||
|
return dataFileIoRate.lastStat().mean();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getDataFileIoStdDev() {
|
||||||
|
return dataFileIoRate.lastStat().stddev();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on flushIoRate
|
||||||
|
public long getFlushIoSampleCount() {
|
||||||
|
return flushIoRate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getFlushIoMean() {
|
||||||
|
return flushIoRate.lastStat().mean();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getFlushIoStdDev() {
|
||||||
|
return flushIoRate.lastStat().stddev();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on syncIoRate
|
||||||
|
public long getSyncIoSampleCount() {
|
||||||
|
return syncIoRate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getSyncIoMean() {
|
||||||
|
return syncIoRate.lastStat().mean();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getSyncIoStdDev() {
|
||||||
|
return syncIoRate.lastStat().stddev();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on readIoRate
|
||||||
|
public long getReadIoSampleCount() {
|
||||||
|
return readIoRate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getReadIoMean() {
|
||||||
|
return readIoRate.lastStat().mean();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getReadIoStdDev() {
|
||||||
|
return readIoRate.lastStat().stddev();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on writeIoRate
|
||||||
|
public long getWriteIoSampleCount() {
|
||||||
|
return syncIoRate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getWriteIoMean() {
|
||||||
|
return syncIoRate.lastStat().mean();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getWriteIoStdDev() {
|
||||||
|
return syncIoRate.lastStat().stddev();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotalFileIoErrors() {
|
||||||
|
return totalFileIoErrors.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on fileIoErrorRate
|
||||||
|
public long getFileIoErrorSampleCount() {
|
||||||
|
return fileIoErrorRate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getFileIoErrorMean() {
|
||||||
|
return fileIoErrorRate.lastStat().mean();
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getFileIoErrorStdDev() {
|
||||||
|
return fileIoErrorRate.lastStat().stddev();
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
private final MetricsSystem ms;
|
||||||
|
|
||||||
|
public DataNodeVolumeMetrics(final MetricsSystem metricsSystem,
|
||||||
|
final String volumeName, final int[] intervals) {
|
||||||
|
this.ms = metricsSystem;
|
||||||
|
this.name = volumeName;
|
||||||
|
final int len = intervals.length;
|
||||||
|
metadataOperationLatencyQuantiles = new MutableQuantiles[len];
|
||||||
|
dataFileIoLatencyQuantiles = new MutableQuantiles[len];
|
||||||
|
flushIoLatencyQuantiles = new MutableQuantiles[len];
|
||||||
|
syncIoLatencyQuantiles = new MutableQuantiles[len];
|
||||||
|
readIoLatencyQuantiles = new MutableQuantiles[len];
|
||||||
|
writeIoLatencyQuantiles = new MutableQuantiles[len];
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
int interval = intervals[i];
|
||||||
|
metadataOperationLatencyQuantiles[i] = registry.newQuantiles(
|
||||||
|
"metadataOperationLatency" + interval + "s",
|
||||||
|
"Meatadata Operation Latency in ms", "ops", "latency", interval);
|
||||||
|
dataFileIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||||
|
"dataFileIoLatency" + interval + "s",
|
||||||
|
"Data File Io Latency in ms", "ops", "latency", interval);
|
||||||
|
flushIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||||
|
"flushIoLatency" + interval + "s",
|
||||||
|
"Data flush Io Latency in ms", "ops", "latency", interval);
|
||||||
|
syncIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||||
|
"syncIoLatency" + interval + "s",
|
||||||
|
"Data sync Io Latency in ms", "ops", "latency", interval);
|
||||||
|
readIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||||
|
"readIoLatency" + interval + "s",
|
||||||
|
"Data read Io Latency in ms", "ops", "latency", interval);
|
||||||
|
writeIoLatencyQuantiles[i] = registry.newQuantiles(
|
||||||
|
"writeIoLatency" + interval + "s",
|
||||||
|
"Data write Io Latency in ms", "ops", "latency", interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DataNodeVolumeMetrics create(final Configuration conf,
|
||||||
|
final String volumeName) {
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
String name = "DataNodeVolume-"+ (volumeName.isEmpty()
|
||||||
|
? "UndefinedDataNodeVolume"+ ThreadLocalRandom.current().nextInt()
|
||||||
|
: volumeName.replace(':', '-'));
|
||||||
|
|
||||||
|
// Percentile measurement is off by default, by watching no intervals
|
||||||
|
int[] intervals =
|
||||||
|
conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
|
||||||
|
return ms.register(name, null, new DataNodeVolumeMetrics(ms, name,
|
||||||
|
intervals));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unRegister() {
|
||||||
|
ms.unregisterSource(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addMetadastaOperationLatency(final long latency) {
|
||||||
|
totalMetadataOperations.incr();
|
||||||
|
metadataOperationRate.add(latency);
|
||||||
|
for (MutableQuantiles q : metadataOperationLatencyQuantiles) {
|
||||||
|
q.add(latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addDataFileIoLatency(final long latency) {
|
||||||
|
totalDataFileIos.incr();
|
||||||
|
dataFileIoRate.add(latency);
|
||||||
|
for (MutableQuantiles q : dataFileIoLatencyQuantiles) {
|
||||||
|
q.add(latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSyncIoLatency(final long latency) {
|
||||||
|
syncIoRate.add(latency);
|
||||||
|
for (MutableQuantiles q : syncIoLatencyQuantiles) {
|
||||||
|
q.add(latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFlushIoLatency(final long latency) {
|
||||||
|
flushIoRate.add(latency);
|
||||||
|
for (MutableQuantiles q : flushIoLatencyQuantiles) {
|
||||||
|
q.add(latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addReadIoLatency(final long latency) {
|
||||||
|
readIoRate.add(latency);
|
||||||
|
for (MutableQuantiles q : readIoLatencyQuantiles) {
|
||||||
|
q.add(latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addWriteIoLatency(final long latency) {
|
||||||
|
writeIoRate.add(latency);
|
||||||
|
for (MutableQuantiles q: writeIoLatencyQuantiles) {
|
||||||
|
q.add(latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addFileIoError(final long latency) {
|
||||||
|
totalFileIoErrors.incr();
|
||||||
|
metadataOperationRate.add(latency);
|
||||||
|
}
|
||||||
|
}
|
|
@ -210,4 +210,6 @@ public interface FsVolumeSpi
|
||||||
}
|
}
|
||||||
|
|
||||||
FileIoProvider getFileIoProvider();
|
FileIoProvider getFileIoProvider();
|
||||||
|
|
||||||
|
DataNodeVolumeMetrics getMetrics();
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -110,6 +111,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
// query from the filesystem.
|
// query from the filesystem.
|
||||||
protected volatile long configuredCapacity;
|
protected volatile long configuredCapacity;
|
||||||
private final FileIoProvider fileIoProvider;
|
private final FileIoProvider fileIoProvider;
|
||||||
|
private final DataNodeVolumeMetrics metrics;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Per-volume worker pool that processes new blocks to cache.
|
* Per-volume worker pool that processes new blocks to cache.
|
||||||
|
@ -137,6 +139,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
this.fileIoProvider = dataset.datanode != null ?
|
this.fileIoProvider = dataset.datanode != null ?
|
||||||
dataset.datanode.getFileIoProvider() : new FileIoProvider(conf);
|
dataset.datanode.getFileIoProvider() : new FileIoProvider(conf);
|
||||||
cacheExecutor = initializeCacheExecutor(parent);
|
cacheExecutor = initializeCacheExecutor(parent);
|
||||||
|
this.metrics = DataNodeVolumeMetrics.create(conf, parent.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
||||||
|
@ -950,6 +953,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
for (Entry<String, BlockPoolSlice> entry : set) {
|
for (Entry<String, BlockPoolSlice> entry : set) {
|
||||||
entry.getValue().shutdown(null);
|
entry.getValue().shutdown(null);
|
||||||
}
|
}
|
||||||
|
if (metrics != null) {
|
||||||
|
metrics.unRegister();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addBlockPool(String bpid, Configuration conf) throws IOException {
|
void addBlockPool(String bpid, Configuration conf) throws IOException {
|
||||||
|
@ -1105,4 +1111,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
public FileIoProvider getFileIoProvider() {
|
public FileIoProvider getFileIoProvider() {
|
||||||
return fileIoProvider;
|
return fileIoProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataNodeVolumeMetrics getMetrics() {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,11 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.ReconfigurationException;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
@ -30,12 +34,17 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
|
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class for accessing package-private DataNode information during tests.
|
* Utility class for accessing package-private DataNode information during tests.
|
||||||
* Must not contain usage of classes that are not explicitly listed as
|
* Must not contain usage of classes that are not explicitly listed as
|
||||||
* dependencies to {@link MiniDFSCluster}.
|
* dependencies to {@link MiniDFSCluster}.
|
||||||
*/
|
*/
|
||||||
public class DataNodeTestUtils {
|
public class DataNodeTestUtils {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(DataNodeTestUtils.class);
|
||||||
private static final String DIR_FAILURE_SUFFIX = ".origin";
|
private static final String DIR_FAILURE_SUFFIX = ".origin";
|
||||||
|
|
||||||
public final static String TEST_CLUSTER_ID = "testClusterID";
|
public final static String TEST_CLUSTER_ID = "testClusterID";
|
||||||
|
@ -175,4 +184,34 @@ public class DataNodeTestUtils {
|
||||||
dn.getDirectoryScanner().reconcile();
|
dn.getDirectoryScanner().reconcile();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reconfigure a DataNode by setting a new list of volumes.
|
||||||
|
*
|
||||||
|
* @param dn DataNode to reconfigure
|
||||||
|
* @param newVols new volumes to configure
|
||||||
|
* @throws Exception if there is any failure
|
||||||
|
*/
|
||||||
|
public static void reconfigureDataNode(DataNode dn, File... newVols)
|
||||||
|
throws Exception {
|
||||||
|
StringBuilder dnNewDataDirs = new StringBuilder();
|
||||||
|
for (File newVol: newVols) {
|
||||||
|
if (dnNewDataDirs.length() > 0) {
|
||||||
|
dnNewDataDirs.append(',');
|
||||||
|
}
|
||||||
|
dnNewDataDirs.append(newVol.getAbsolutePath());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
assertThat(
|
||||||
|
dn.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||||
|
dnNewDataDirs.toString()),
|
||||||
|
is(dn.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
} catch (ReconfigurationException e) {
|
||||||
|
// This can be thrown if reconfiguration tries to use a failed volume.
|
||||||
|
// We need to swallow the exception, because some of our tests want to
|
||||||
|
// cover this case.
|
||||||
|
LOG.warn("Could not reconfigure DataNode.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
@ -449,11 +450,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
static class SimulatedVolume implements FsVolumeSpi {
|
static class SimulatedVolume implements FsVolumeSpi {
|
||||||
private final SimulatedStorage storage;
|
private final SimulatedStorage storage;
|
||||||
private final FileIoProvider fileIoProvider;
|
private final FileIoProvider fileIoProvider;
|
||||||
|
private final DataNodeVolumeMetrics metrics;
|
||||||
|
|
||||||
SimulatedVolume(final SimulatedStorage storage,
|
SimulatedVolume(final SimulatedStorage storage,
|
||||||
final FileIoProvider fileIoProvider) {
|
final FileIoProvider fileIoProvider,
|
||||||
|
final DataNodeVolumeMetrics metrics) {
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
this.fileIoProvider = fileIoProvider;
|
this.fileIoProvider = fileIoProvider;
|
||||||
|
this.metrics = metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -540,6 +544,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
return fileIoProvider;
|
return fileIoProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataNodeVolumeMetrics getMetrics() {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VolumeCheckResult check(VolumeCheckContext context)
|
public VolumeCheckResult check(VolumeCheckContext context)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -575,7 +584,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
this.storage = new SimulatedStorage(
|
this.storage = new SimulatedStorage(
|
||||||
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
|
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
|
||||||
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
|
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
|
||||||
this.volume = new SimulatedVolume(this.storage, this.fileIoProvider);
|
|
||||||
|
// TODO: per volume id or path
|
||||||
|
DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf,
|
||||||
|
datanodeUuid);
|
||||||
|
this.volume = new SimulatedVolume(this.storage, this.fileIoProvider,
|
||||||
|
volumeMetrics);
|
||||||
this.datasetLock = new AutoCloseableLock();
|
this.datasetLock = new AutoCloseableLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1393,4 +1407,3 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
return datasetLock.acquire();
|
return datasetLock.acquire();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
|
@ -34,7 +33,6 @@ import java.util.ArrayList;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.ReconfigurationException;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -59,7 +57,8 @@ import org.junit.Test;
|
||||||
*/
|
*/
|
||||||
public class TestDataNodeVolumeFailureReporting {
|
public class TestDataNodeVolumeFailureReporting {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestDataNodeVolumeFailureReporting.class);
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestDataNodeVolumeFailureReporting.class);
|
||||||
{
|
{
|
||||||
GenericTestUtils.setLogLevel(TestDataNodeVolumeFailureReporting.LOG,
|
GenericTestUtils.setLogLevel(TestDataNodeVolumeFailureReporting.LOG,
|
||||||
Level.ALL);
|
Level.ALL);
|
||||||
|
@ -387,8 +386,8 @@ public class TestDataNodeVolumeFailureReporting {
|
||||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||||
|
|
||||||
// Reconfigure again to try to add back the failed volumes.
|
// Reconfigure again to try to add back the failed volumes.
|
||||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
DataNodeTestUtils.reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||||
|
|
||||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||||
|
@ -408,8 +407,8 @@ public class TestDataNodeVolumeFailureReporting {
|
||||||
|
|
||||||
// Reconfigure a third time with the failed volumes. Afterwards, we expect
|
// Reconfigure a third time with the failed volumes. Afterwards, we expect
|
||||||
// the same volume failures to be reported. (No double-counting.)
|
// the same volume failures to be reported. (No double-counting.)
|
||||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
DataNodeTestUtils.reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||||
|
|
||||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||||
|
@ -430,8 +429,8 @@ public class TestDataNodeVolumeFailureReporting {
|
||||||
// Replace failed volume with healthy volume and run reconfigure DataNode.
|
// Replace failed volume with healthy volume and run reconfigure DataNode.
|
||||||
// The failed volume information should be cleared.
|
// The failed volume information should be cleared.
|
||||||
DataNodeTestUtils.restoreDataDirFromFailure(dn1Vol1, dn2Vol1);
|
DataNodeTestUtils.restoreDataDirFromFailure(dn1Vol1, dn2Vol1);
|
||||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
DataNodeTestUtils.reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||||
|
|
||||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||||
|
@ -631,34 +630,4 @@ public class TestDataNodeVolumeFailureReporting {
|
||||||
cluster.getNamesystem().getBlockManager().getDatanodeManager(), 0);
|
cluster.getNamesystem().getBlockManager().getDatanodeManager(), 0);
|
||||||
volumeCapacity = dnCapacity / cluster.getStoragesPerDatanode();
|
volumeCapacity = dnCapacity / cluster.getStoragesPerDatanode();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Reconfigure a DataNode by setting a new list of volumes.
|
|
||||||
*
|
|
||||||
* @param dn DataNode to reconfigure
|
|
||||||
* @param newVols new volumes to configure
|
|
||||||
* @throws Exception if there is any failure
|
|
||||||
*/
|
|
||||||
private static void reconfigureDataNode(DataNode dn, File... newVols)
|
|
||||||
throws Exception {
|
|
||||||
StringBuilder dnNewDataDirs = new StringBuilder();
|
|
||||||
for (File newVol: newVols) {
|
|
||||||
if (dnNewDataDirs.length() > 0) {
|
|
||||||
dnNewDataDirs.append(',');
|
|
||||||
}
|
|
||||||
dnNewDataDirs.append(newVol.getAbsolutePath());
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
assertThat(
|
|
||||||
dn.reconfigurePropertyImpl(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
|
||||||
dnNewDataDirs.toString()),
|
|
||||||
is(dn.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)));
|
|
||||||
} catch (ReconfigurationException e) {
|
|
||||||
// This can be thrown if reconfiguration tries to use a failed volume.
|
|
||||||
// We need to swallow the exception, because some of our tests want to
|
|
||||||
// cover this case.
|
|
||||||
LOG.warn("Could not reconfigure DataNode.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,182 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for DataNodeVolumeMetrics.
|
||||||
|
*/
|
||||||
|
public class TestDataNodeVolumeMetrics {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestDataNodeVolumeMetrics.class);
|
||||||
|
|
||||||
|
private static final int BLOCK_SIZE = 1024;
|
||||||
|
private static final short REPL = 1;
|
||||||
|
private static final int NUM_DATANODES = 1;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout timeout = new Timeout(300000);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVolumeMetrics() throws Exception {
|
||||||
|
MiniDFSCluster cluster = setupClusterForVolumeMetrics();
|
||||||
|
try {
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path fileName = new Path("/test.dat");
|
||||||
|
final long fileLen = Integer.MAX_VALUE + 1L;
|
||||||
|
DFSTestUtil.createFile(fs, fileName, false, BLOCK_SIZE, fileLen,
|
||||||
|
fs.getDefaultBlockSize(fileName),
|
||||||
|
REPL, 1L, true);
|
||||||
|
|
||||||
|
try (FSDataOutputStream out = fs.append(fileName)) {
|
||||||
|
out.writeBytes("hello world");
|
||||||
|
((DFSOutputStream) out.getWrappedStream()).hsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyDataNodeVolumeMetrics(fs, cluster, fileName);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVolumeMetricsWithVolumeDepartureArrival() throws Exception {
|
||||||
|
MiniDFSCluster cluster = setupClusterForVolumeMetrics();
|
||||||
|
try {
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path fileName = new Path("/test.dat");
|
||||||
|
final long fileLen = Integer.MAX_VALUE + 1L;
|
||||||
|
DFSTestUtil.createFile(fs, fileName, false, BLOCK_SIZE, fileLen,
|
||||||
|
fs.getDefaultBlockSize(fileName),
|
||||||
|
REPL, 1L, true);
|
||||||
|
|
||||||
|
try (FSDataOutputStream out = fs.append(fileName)) {
|
||||||
|
out.writeBytes("hello world");
|
||||||
|
((DFSOutputStream) out.getWrappedStream()).hsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrayList<DataNode> dns = cluster.getDataNodes();
|
||||||
|
assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
|
||||||
|
|
||||||
|
final String dataDir = cluster.getDataDirectory();
|
||||||
|
final File dn1Vol2 = new File(dataDir, "data2");
|
||||||
|
|
||||||
|
DataNodeTestUtils.injectDataDirFailure(dn1Vol2);
|
||||||
|
verifyDataNodeVolumeMetrics(fs, cluster, fileName);
|
||||||
|
|
||||||
|
DataNodeTestUtils.restoreDataDirFromFailure(dn1Vol2);
|
||||||
|
DataNodeTestUtils.reconfigureDataNode(dns.get(0), dn1Vol2);
|
||||||
|
verifyDataNodeVolumeMetrics(fs, cluster, fileName);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MiniDFSCluster setupClusterForVolumeMetrics() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
|
||||||
|
"org.apache.hadoop.hdfs.server.datanode.ProfilingFileIoEvents");
|
||||||
|
SimulatedFSDataset.setFactory(conf);
|
||||||
|
return new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(NUM_DATANODES)
|
||||||
|
.storageTypes(new StorageType[]{StorageType.RAM_DISK, StorageType.DISK})
|
||||||
|
.storagesPerDatanode(2)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyDataNodeVolumeMetrics(final FileSystem fs,
|
||||||
|
final MiniDFSCluster cluster, final Path fileName) throws IOException {
|
||||||
|
List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
DataNode datanode = datanodes.get(0);
|
||||||
|
|
||||||
|
final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
|
||||||
|
final FsVolumeSpi volume = datanode.getFSDataset().getVolume(block);
|
||||||
|
DataNodeVolumeMetrics metrics = volume.getMetrics();
|
||||||
|
|
||||||
|
MetricsRecordBuilder rb = getMetrics(volume.getMetrics().name());
|
||||||
|
assertCounter("TotalDataFileIos", metrics.getTotalDataFileIos(), rb);
|
||||||
|
|
||||||
|
LOG.info("TotalMetadataOperations : " +
|
||||||
|
metrics.getTotalMetadataOperations());
|
||||||
|
LOG.info("TotalDataFileIos : " + metrics.getTotalDataFileIos());
|
||||||
|
LOG.info("TotalFileIoErrors : " + metrics.getTotalFileIoErrors());
|
||||||
|
|
||||||
|
LOG.info("MetadataOperationSampleCount : " +
|
||||||
|
metrics.getMetadataOperationSampleCount());
|
||||||
|
LOG.info("MetadataOperationMean : " + metrics.getMetadataOperationMean());
|
||||||
|
LOG.info("MetadataFileIoStdDev : " +
|
||||||
|
metrics.getMetadataOperationStdDev());
|
||||||
|
|
||||||
|
LOG.info("DataFileIoSampleCount : " + metrics.getDataFileIoSampleCount());
|
||||||
|
LOG.info("DataFileIoMean : " + metrics.getDataFileIoMean());
|
||||||
|
LOG.info("DataFileIoStdDev : " + metrics.getDataFileIoStdDev());
|
||||||
|
|
||||||
|
LOG.info("flushIoSampleCount : " + metrics.getFlushIoSampleCount());
|
||||||
|
LOG.info("flushIoMean : " + metrics.getFlushIoMean());
|
||||||
|
LOG.info("flushIoStdDev : " + metrics.getFlushIoStdDev());
|
||||||
|
|
||||||
|
LOG.info("syncIoSampleCount : " + metrics.getSyncIoSampleCount());
|
||||||
|
LOG.info("syncIoMean : " + metrics.getSyncIoMean());
|
||||||
|
LOG.info("syncIoStdDev : " + metrics.getSyncIoStdDev());
|
||||||
|
|
||||||
|
LOG.info("readIoSampleCount : " + metrics.getReadIoMean());
|
||||||
|
LOG.info("readIoMean : " + metrics.getReadIoMean());
|
||||||
|
LOG.info("readIoStdDev : " + metrics.getReadIoStdDev());
|
||||||
|
|
||||||
|
LOG.info("writeIoSampleCount : " + metrics.getWriteIoSampleCount());
|
||||||
|
LOG.info("writeIoMean : " + metrics.getWriteIoMean());
|
||||||
|
LOG.info("writeIoStdDev : " + metrics.getWriteIoStdDev());
|
||||||
|
|
||||||
|
LOG.info("fileIoErrorSampleCount : "
|
||||||
|
+ metrics.getFileIoErrorSampleCount());
|
||||||
|
LOG.info("fileIoErrorMean : " + metrics.getFileIoErrorMean());
|
||||||
|
LOG.info("fileIoErrorStdDev : " + metrics.getFileIoErrorStdDev());
|
||||||
|
}
|
||||||
|
}
|
|
@ -53,6 +53,9 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -891,6 +894,11 @@ public class TestDirectoryScanner {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataNodeVolumeMetrics getMetrics() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VolumeCheckResult check(VolumeCheckContext context)
|
public VolumeCheckResult check(VolumeCheckContext context)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.nio.channels.ClosedChannelException;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
@ -115,6 +116,11 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataNodeVolumeMetrics getMetrics() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VolumeCheckResult check(VolumeCheckContext context)
|
public VolumeCheckResult check(VolumeCheckContext context)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue