HDFS-11299. Support multiple Datanode File IO hooks. Contributed by Hanisha Koneru.
This commit is contained in:
parent
c18590fce2
commit
4046794a53
|
@ -326,7 +326,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|
|||
FsVolume
|
||||
--------
|
||||
|
||||
Per-volume metrics contain Datanode Volume IO related statistics. Per-volume metrics are off by default. They can be enbabled by setting `dfs.datanode.fileio.events.class` to **org.apache.hadoop.hdfs.server.datanode.ProfilingFileIoEvents**, but enabling per-volume metrics may have a performance impact. Each metrics record contains tags such as Hostname as additional information along with metrics.
|
||||
Per-volume metrics contain Datanode Volume IO related statistics. Per-volume metrics are off by default. They can be enbabled by setting `dfs.datanode.enable.fileio.profiling` to **true**, but enabling per-volume metrics may have a performance impact. Each metrics record contains tags such as Hostname as additional information along with metrics.
|
||||
|
||||
| Name | Description |
|
||||
|:---- |:---- |
|
||||
|
|
|
@ -698,8 +698,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
|
||||
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
|
||||
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
|
||||
public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY =
|
||||
"dfs.datanode.fileio.events.class";
|
||||
public static final String DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY =
|
||||
"dfs.datanode.enable.fileio.profiling";
|
||||
public static final boolean DFS_DATANODE_ENABLE_FILEIO_PROFILING_DEFAULT =
|
||||
false;
|
||||
public static final String DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY =
|
||||
"dfs.datanode.enable.fileio.fault.injection";
|
||||
public static final boolean
|
||||
DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT = false;
|
||||
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
|
||||
public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
|
||||
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";
|
||||
|
|
|
@ -1,106 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* {@link FileIoEvents} that simply counts the number of operations.
|
||||
* Not meant to be used outside of testing.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class CountingFileIoEvents extends FileIoEvents {
|
||||
private final Map<OPERATION, Counts> counts;
|
||||
|
||||
private static class Counts {
|
||||
private final AtomicLong successes = new AtomicLong(0);
|
||||
private final AtomicLong failures = new AtomicLong(0);
|
||||
|
||||
@JsonProperty("Successes")
|
||||
public long getSuccesses() {
|
||||
return successes.get();
|
||||
}
|
||||
|
||||
@JsonProperty("Failures")
|
||||
public long getFailures() {
|
||||
return failures.get();
|
||||
}
|
||||
}
|
||||
|
||||
public CountingFileIoEvents() {
|
||||
counts = new HashMap<>();
|
||||
for (OPERATION op : OPERATION.values()) {
|
||||
counts.put(op, new Counts());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long beforeMetadataOp(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterMetadataOp(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, long begin) {
|
||||
counts.get(op).successes.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long beforeFileIo(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, long len) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterFileIo(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
|
||||
counts.get(op).successes.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
|
||||
counts.get(op).failures.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStatistics() {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
try {
|
||||
return objectMapper.writeValueAsString(counts);
|
||||
} catch (JsonProcessingException e) {
|
||||
// Failed to serialize. Don't log the exception call stack.
|
||||
FileIoProvider.LOG.error("Failed to serialize statistics" + e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3023,11 +3023,6 @@ public class DataNode extends ReconfigurableBase
|
|||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
@Override // DataNodeMXBean
|
||||
public String getFileIoProviderStatistics() {
|
||||
return fileIoProvider.getStatistics();
|
||||
}
|
||||
|
||||
public void refreshNamenodes(Configuration conf) throws IOException {
|
||||
blockPoolManager.refreshNamenodes(conf);
|
||||
|
|
|
@ -121,11 +121,6 @@ public interface DataNodeMXBean {
|
|||
*/
|
||||
String getDiskBalancerStatus();
|
||||
|
||||
/**
|
||||
* Gets the {@link FileIoProvider} statistics.
|
||||
*/
|
||||
String getFileIoProviderStatistics();
|
||||
|
||||
/**
|
||||
* Gets the average info (e.g. time) of SendPacketDownstream when the DataNode
|
||||
* acts as the penultimate (2nd to the last) node in pipeline.
|
||||
|
|
|
@ -20,48 +20,36 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|||
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* The default implementation of {@link FileIoEvents} that do nothing.
|
||||
* Injects faults in the metadata and data related operations on datanode
|
||||
* volumes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public final class DefaultFileIoEvents extends FileIoEvents {
|
||||
@Override
|
||||
public long beforeMetadataOp(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op) {
|
||||
return 0;
|
||||
public class FaultInjectorFileIoEvents {
|
||||
|
||||
private final boolean isEnabled;
|
||||
|
||||
public FaultInjectorFileIoEvents(@Nullable Configuration conf) {
|
||||
if (conf != null) {
|
||||
isEnabled = conf.getBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, DFSConfigKeys
|
||||
.DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT);
|
||||
} else {
|
||||
isEnabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterMetadataOp(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, long begin) {
|
||||
public void beforeMetadataOp(
|
||||
@Nullable FsVolumeSpi volume, FileIoProvider.OPERATION op) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long beforeFileIo(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, long len) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterFileIo(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable String getStatistics() {
|
||||
// null is valid JSON.
|
||||
return null;
|
||||
public void beforeFileIo(
|
||||
@Nullable FsVolumeSpi volume, FileIoProvider.OPERATION op, long len) {
|
||||
}
|
||||
}
|
|
@ -1,115 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* The following hooks can be implemented for instrumentation/fault
|
||||
* injection.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public abstract class FileIoEvents {
|
||||
|
||||
/**
|
||||
* Invoked before a filesystem metadata operation.
|
||||
*
|
||||
* @param volume target volume for the operation. Null if unavailable.
|
||||
* @param op type of operation.
|
||||
* @return timestamp at which the operation was started. 0 if
|
||||
* unavailable.
|
||||
*/
|
||||
abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
|
||||
|
||||
/**
|
||||
* Invoked after a filesystem metadata operation has completed.
|
||||
*
|
||||
* @param volume target volume for the operation. Null if unavailable.
|
||||
* @param op type of operation.
|
||||
* @param begin timestamp at which the operation was started. 0
|
||||
* if unavailable.
|
||||
*/
|
||||
abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op,
|
||||
long begin);
|
||||
|
||||
/**
|
||||
* Invoked before a read/write/flush/channel transfer operation.
|
||||
*
|
||||
* @param volume target volume for the operation. Null if unavailable.
|
||||
* @param op type of operation.
|
||||
* @param len length of the file IO. 0 for flush.
|
||||
* @return timestamp at which the operation was started. 0 if
|
||||
* unavailable.
|
||||
*/
|
||||
abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
|
||||
long len);
|
||||
|
||||
|
||||
/**
|
||||
* Invoked after a read/write/flush/channel transfer operation
|
||||
* has completed.
|
||||
*
|
||||
* @param volume target volume for the operation. Null if unavailable.
|
||||
* @param op type of operation.
|
||||
* @param len of the file IO. 0 for flush.
|
||||
* @return timestamp at which the operation was started. 0 if
|
||||
* unavailable.
|
||||
*/
|
||||
abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
|
||||
long begin, long len);
|
||||
|
||||
/**
|
||||
* Invoked if an operation fails with an exception.
|
||||
* @param volume target volume for the operation. Null if unavailable.
|
||||
* @param op type of operation.
|
||||
* @param e Exception encountered during the operation.
|
||||
* @param begin time at which the operation was started.
|
||||
*/
|
||||
abstract void onFailure(
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
|
||||
|
||||
/**
|
||||
* Invoked by FileIoProvider if an operation fails with an exception.
|
||||
* @param datanode datanode that runs volume check upon volume io failure
|
||||
* @param volume target volume for the operation. Null if unavailable.
|
||||
* @param op type of operation.
|
||||
* @param e Exception encountered during the operation.
|
||||
* @param begin time at which the operation was started.
|
||||
*/
|
||||
void onFailure(DataNode datanode,
|
||||
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
|
||||
onFailure(volume, op, e, begin);
|
||||
if (datanode != null && volume != null) {
|
||||
datanode.checkDiskErrorAsync(volume);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return statistics as a JSON string.
|
||||
* @return
|
||||
*/
|
||||
@Nullable abstract String getStatistics();
|
||||
}
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -34,7 +33,6 @@ import org.apache.hadoop.io.LongWritable;
|
|||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.io.nativeio.NativeIOException;
|
||||
import org.apache.hadoop.net.SocketOutputStream;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -59,11 +57,14 @@ import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*;
|
|||
|
||||
/**
|
||||
* This class abstracts out various file IO operations performed by the
|
||||
* DataNode and invokes event hooks before and after each file IO.
|
||||
* DataNode and invokes profiling (for collecting stats) and fault injection
|
||||
* (for testing) event hooks before and after each file IO.
|
||||
*
|
||||
* Behavior can be injected into these events by implementing
|
||||
* {@link FileIoEvents} and replacing the default implementation
|
||||
* with {@link DFSConfigKeys#DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY}.
|
||||
* Behavior can be injected into these events by enabling the
|
||||
* profiling and/or fault injection event hooks through
|
||||
* {@link DFSConfigKeys#DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY} and
|
||||
* {@link DFSConfigKeys#DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY}.
|
||||
* These event hooks are disabled by default.
|
||||
*
|
||||
* Most functions accept an optional {@link FsVolumeSpi} parameter for
|
||||
* instrumentation/logging.
|
||||
|
@ -78,9 +79,12 @@ public class FileIoProvider {
|
|||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
FileIoProvider.class);
|
||||
|
||||
private final FileIoEvents eventHooks;
|
||||
private final ProfilingFileIoEvents profilingEventHook;
|
||||
private final FaultInjectorFileIoEvents faultInjectorEventHook;
|
||||
private final DataNode datanode;
|
||||
|
||||
private static final int LEN_INT = 4;
|
||||
|
||||
/**
|
||||
* @param conf Configuration object. May be null. When null,
|
||||
* the event handlers are no-ops.
|
||||
|
@ -89,15 +93,8 @@ public class FileIoProvider {
|
|||
*/
|
||||
public FileIoProvider(@Nullable Configuration conf,
|
||||
final DataNode datanode) {
|
||||
if (conf != null) {
|
||||
final Class<? extends FileIoEvents> clazz = conf.getClass(
|
||||
DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
|
||||
DefaultFileIoEvents.class,
|
||||
FileIoEvents.class);
|
||||
eventHooks = ReflectionUtils.newInstance(clazz, conf);
|
||||
} else {
|
||||
eventHooks = new DefaultFileIoEvents();
|
||||
}
|
||||
profilingEventHook = new ProfilingFileIoEvents(conf);
|
||||
faultInjectorEventHook = new FaultInjectorFileIoEvents(conf);
|
||||
this.datanode = datanode;
|
||||
}
|
||||
|
||||
|
@ -122,15 +119,6 @@ public class FileIoProvider {
|
|||
NATIVE_COPY
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve statistics from the underlying {@link FileIoEvents}
|
||||
* implementation as a JSON string, if it maintains them.
|
||||
* @return statistics as a JSON string. May be null.
|
||||
*/
|
||||
public @Nullable String getStatistics() {
|
||||
return eventHooks.getStatistics();
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@link Flushable#flush()}.
|
||||
*
|
||||
|
@ -139,12 +127,13 @@ public class FileIoProvider {
|
|||
*/
|
||||
public void flush(
|
||||
@Nullable FsVolumeSpi volume, Flushable f) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, FLUSH, 0);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, FLUSH, 0);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, FLUSH, 0);
|
||||
f.flush();
|
||||
eventHooks.afterFileIo(volume, FLUSH, begin, 0);
|
||||
profilingEventHook.afterFileIo(volume, FLUSH, begin, 0);
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, FLUSH, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -157,12 +146,13 @@ public class FileIoProvider {
|
|||
*/
|
||||
public void sync(
|
||||
@Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
|
||||
fos.getChannel().force(true);
|
||||
eventHooks.afterFileIo(volume, SYNC, begin, 0);
|
||||
profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, SYNC, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -176,12 +166,13 @@ public class FileIoProvider {
|
|||
public void syncFileRange(
|
||||
@Nullable FsVolumeSpi volume, FileDescriptor outFd,
|
||||
long offset, long numBytes, int flags) throws NativeIOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
|
||||
NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
|
||||
eventHooks.afterFileIo(volume, SYNC, begin, 0);
|
||||
profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, SYNC, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -195,13 +186,14 @@ public class FileIoProvider {
|
|||
public void posixFadvise(
|
||||
@Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd,
|
||||
long offset, long length, int flags) throws NativeIOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, FADVISE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, FADVISE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, FADVISE);
|
||||
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
||||
identifier, outFd, offset, length, flags);
|
||||
eventHooks.afterMetadataOp(volume, FADVISE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, FADVISE, begin);
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, FADVISE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -213,13 +205,14 @@ public class FileIoProvider {
|
|||
* @return true if the file was successfully deleted.
|
||||
*/
|
||||
public boolean delete(@Nullable FsVolumeSpi volume, File f) {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
|
||||
boolean deleted = f.delete();
|
||||
eventHooks.afterMetadataOp(volume, DELETE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, DELETE, begin);
|
||||
return deleted;
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, DELETE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -232,16 +225,17 @@ public class FileIoProvider {
|
|||
* existed.
|
||||
*/
|
||||
public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
|
||||
boolean deleted = !f.exists() || f.delete();
|
||||
eventHooks.afterMetadataOp(volume, DELETE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, DELETE, begin);
|
||||
if (!deleted) {
|
||||
LOG.warn("Failed to delete file {}", f);
|
||||
}
|
||||
return deleted;
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, DELETE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -263,13 +257,14 @@ public class FileIoProvider {
|
|||
@Nullable FsVolumeSpi volume, SocketOutputStream sockOut,
|
||||
FileChannel fileCh, long position, int count,
|
||||
LongWritable waitTime, LongWritable transferTime) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, TRANSFER, count);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, TRANSFER, count);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, TRANSFER, count);
|
||||
sockOut.transferToFully(fileCh, position, count,
|
||||
waitTime, transferTime);
|
||||
eventHooks.afterFileIo(volume, TRANSFER, begin, count);
|
||||
profilingEventHook.afterFileIo(volume, TRANSFER, begin, count);
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, TRANSFER, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -284,13 +279,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
public boolean createFile(
|
||||
@Nullable FsVolumeSpi volume, File f) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
|
||||
boolean created = f.createNewFile();
|
||||
eventHooks.afterMetadataOp(volume, OPEN, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, OPEN, begin);
|
||||
return created;
|
||||
} catch (Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, OPEN, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -309,15 +305,16 @@ public class FileIoProvider {
|
|||
*/
|
||||
public FileInputStream getFileInputStream(
|
||||
@Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
|
||||
FileInputStream fis = null;
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
|
||||
fis = new WrappedFileInputStream(volume, f);
|
||||
eventHooks.afterMetadataOp(volume, OPEN, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, OPEN, begin);
|
||||
return fis;
|
||||
} catch(Exception e) {
|
||||
org.apache.commons.io.IOUtils.closeQuietly(fis);
|
||||
eventHooks.onFailure(datanode, volume, OPEN, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -339,15 +336,16 @@ public class FileIoProvider {
|
|||
public FileOutputStream getFileOutputStream(
|
||||
@Nullable FsVolumeSpi volume, File f,
|
||||
boolean append) throws FileNotFoundException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
|
||||
FileOutputStream fos = null;
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
|
||||
fos = new WrappedFileOutputStream(volume, f, append);
|
||||
eventHooks.afterMetadataOp(volume, OPEN, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, OPEN, begin);
|
||||
return fos;
|
||||
} catch(Exception e) {
|
||||
org.apache.commons.io.IOUtils.closeQuietly(fos);
|
||||
eventHooks.onFailure(datanode, volume, OPEN, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -403,16 +401,17 @@ public class FileIoProvider {
|
|||
public FileInputStream getShareDeleteFileInputStream(
|
||||
@Nullable FsVolumeSpi volume, File f,
|
||||
long offset) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
|
||||
FileInputStream fis = null;
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
|
||||
fis = new WrappedFileInputStream(volume,
|
||||
NativeIO.getShareDeleteFileDescriptor(f, offset));
|
||||
eventHooks.afterMetadataOp(volume, OPEN, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, OPEN, begin);
|
||||
return fis;
|
||||
} catch(Exception e) {
|
||||
org.apache.commons.io.IOUtils.closeQuietly(fis);
|
||||
eventHooks.onFailure(datanode, volume, OPEN, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -434,16 +433,17 @@ public class FileIoProvider {
|
|||
*/
|
||||
public FileInputStream openAndSeek(
|
||||
@Nullable FsVolumeSpi volume, File f, long offset) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
|
||||
FileInputStream fis = null;
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
|
||||
fis = new WrappedFileInputStream(volume,
|
||||
FsDatasetUtil.openAndSeek(f, offset));
|
||||
eventHooks.afterMetadataOp(volume, OPEN, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, OPEN, begin);
|
||||
return fis;
|
||||
} catch(Exception e) {
|
||||
org.apache.commons.io.IOUtils.closeQuietly(fis);
|
||||
eventHooks.onFailure(datanode, volume, OPEN, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -465,15 +465,16 @@ public class FileIoProvider {
|
|||
public RandomAccessFile getRandomAccessFile(
|
||||
@Nullable FsVolumeSpi volume, File f,
|
||||
String mode) throws FileNotFoundException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
|
||||
raf = new WrappedRandomAccessFile(volume, f, mode);
|
||||
eventHooks.afterMetadataOp(volume, OPEN, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, OPEN, begin);
|
||||
return raf;
|
||||
} catch(Exception e) {
|
||||
org.apache.commons.io.IOUtils.closeQuietly(raf);
|
||||
eventHooks.onFailure(datanode, volume, OPEN, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -486,13 +487,14 @@ public class FileIoProvider {
|
|||
* @return true on success false on failure.
|
||||
*/
|
||||
public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
|
||||
boolean deleted = FileUtil.fullyDelete(dir);
|
||||
eventHooks.afterMetadataOp(volume, DELETE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, DELETE, begin);
|
||||
return deleted;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, DELETE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -508,12 +510,13 @@ public class FileIoProvider {
|
|||
*/
|
||||
public void replaceFile(
|
||||
@Nullable FsVolumeSpi volume, File src, File target) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
|
||||
FileUtil.replaceFile(src, target);
|
||||
eventHooks.afterMetadataOp(volume, MOVE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, MOVE, begin);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, MOVE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -530,12 +533,13 @@ public class FileIoProvider {
|
|||
public void rename(
|
||||
@Nullable FsVolumeSpi volume, File src, File target)
|
||||
throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
|
||||
Storage.rename(src, target);
|
||||
eventHooks.afterMetadataOp(volume, MOVE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, MOVE, begin);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, MOVE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -552,12 +556,13 @@ public class FileIoProvider {
|
|||
public void moveFile(
|
||||
@Nullable FsVolumeSpi volume, File src, File target)
|
||||
throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
|
||||
FileUtils.moveFile(src, target);
|
||||
eventHooks.afterMetadataOp(volume, MOVE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, MOVE, begin);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, MOVE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -576,12 +581,13 @@ public class FileIoProvider {
|
|||
public void move(
|
||||
@Nullable FsVolumeSpi volume, Path src, Path target,
|
||||
CopyOption... options) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
|
||||
Files.move(src, target, options);
|
||||
eventHooks.afterMetadataOp(volume, MOVE, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, MOVE, begin);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, MOVE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -600,12 +606,14 @@ public class FileIoProvider {
|
|||
@Nullable FsVolumeSpi volume, File src, File target,
|
||||
boolean preserveFileDate) throws IOException {
|
||||
final long length = src.length();
|
||||
final long begin = eventHooks.beforeFileIo(volume, NATIVE_COPY, length);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, NATIVE_COPY,
|
||||
length);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, NATIVE_COPY, length);
|
||||
Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
|
||||
eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
|
||||
profilingEventHook.afterFileIo(volume, NATIVE_COPY, begin, length);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -622,15 +630,16 @@ public class FileIoProvider {
|
|||
*/
|
||||
public boolean mkdirs(
|
||||
@Nullable FsVolumeSpi volume, File dir) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, MKDIRS);
|
||||
boolean created = false;
|
||||
boolean isDirectory;
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, MKDIRS);
|
||||
created = dir.mkdirs();
|
||||
isDirectory = !created && dir.isDirectory();
|
||||
eventHooks.afterMetadataOp(volume, MKDIRS, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, MKDIRS, begin);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
@ -650,13 +659,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
public void mkdirsWithExistsCheck(
|
||||
@Nullable FsVolumeSpi volume, File dir) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, MKDIRS);
|
||||
boolean succeeded = false;
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, MKDIRS);
|
||||
succeeded = dir.isDirectory() || dir.mkdirs();
|
||||
eventHooks.afterMetadataOp(volume, MKDIRS, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, MKDIRS, begin);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
@ -676,13 +686,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
public File[] listFiles(
|
||||
@Nullable FsVolumeSpi volume, File dir) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, LIST);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, LIST);
|
||||
File[] children = FileUtil.listFiles(dir);
|
||||
eventHooks.afterMetadataOp(volume, LIST, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, LIST, begin);
|
||||
return children;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, LIST, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -698,13 +709,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
public String[] list(
|
||||
@Nullable FsVolumeSpi volume, File dir) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, LIST);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, LIST);
|
||||
String[] children = FileUtil.list(dir);
|
||||
eventHooks.afterMetadataOp(volume, LIST, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, LIST, begin);
|
||||
return children;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, LIST, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -721,13 +733,14 @@ public class FileIoProvider {
|
|||
public List<String> listDirectory(
|
||||
@Nullable FsVolumeSpi volume, File dir,
|
||||
FilenameFilter filter) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, LIST);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, LIST);
|
||||
List<String> children = IOUtils.listDirectory(dir, filter);
|
||||
eventHooks.afterMetadataOp(volume, LIST, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, LIST, begin);
|
||||
return children;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, LIST, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -743,13 +756,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
public int getHardLinkCount(
|
||||
@Nullable FsVolumeSpi volume, File f) throws IOException {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, LIST);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, LIST);
|
||||
int count = HardLink.getLinkCount(f);
|
||||
eventHooks.afterMetadataOp(volume, LIST, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, LIST, begin);
|
||||
return count;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, LIST, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -762,13 +776,14 @@ public class FileIoProvider {
|
|||
* @return true if the file exists.
|
||||
*/
|
||||
public boolean exists(@Nullable FsVolumeSpi volume, File f) {
|
||||
final long begin = eventHooks.beforeMetadataOp(volume, EXISTS);
|
||||
final long begin = profilingEventHook.beforeMetadataOp(volume, EXISTS);
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, EXISTS);
|
||||
boolean exists = f.exists();
|
||||
eventHooks.afterMetadataOp(volume, EXISTS, begin);
|
||||
profilingEventHook.afterMetadataOp(volume, EXISTS, begin);
|
||||
return exists;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, EXISTS, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -803,13 +818,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, READ, 1);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, READ, LEN_INT);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, READ, LEN_INT);
|
||||
int b = super.read();
|
||||
eventHooks.afterFileIo(volume, READ, begin, 1);
|
||||
profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT);
|
||||
return b;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, READ, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -819,13 +835,15 @@ public class FileIoProvider {
|
|||
*/
|
||||
@Override
|
||||
public int read(@Nonnull byte[] b) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, READ, b
|
||||
.length);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, READ, b.length);
|
||||
int numBytesRead = super.read(b);
|
||||
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
return numBytesRead;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, READ, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -835,13 +853,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
@Override
|
||||
public int read(@Nonnull byte[] b, int off, int len) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, READ, len);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, READ, len);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, READ, len);
|
||||
int numBytesRead = super.read(b, off, len);
|
||||
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
return numBytesRead;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, READ, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -878,12 +897,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, WRITE,
|
||||
LEN_INT);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT);
|
||||
super.write(b);
|
||||
eventHooks.afterFileIo(volume, WRITE, begin, 1);
|
||||
profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, WRITE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -893,12 +914,14 @@ public class FileIoProvider {
|
|||
*/
|
||||
@Override
|
||||
public void write(@Nonnull byte[] b) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, WRITE, b
|
||||
.length);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length);
|
||||
super.write(b);
|
||||
eventHooks.afterFileIo(volume, WRITE, begin, b.length);
|
||||
profilingEventHook.afterFileIo(volume, WRITE, begin, b.length);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, WRITE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -908,12 +931,13 @@ public class FileIoProvider {
|
|||
*/
|
||||
@Override
|
||||
public void write(@Nonnull byte[] b, int off, int len) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, WRITE, len);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, WRITE, len);
|
||||
super.write(b, off, len);
|
||||
eventHooks.afterFileIo(volume, WRITE, begin, len);
|
||||
profilingEventHook.afterFileIo(volume, WRITE, begin, len);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, WRITE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -935,77 +959,93 @@ public class FileIoProvider {
|
|||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, READ, 1);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, READ, LEN_INT);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, READ, LEN_INT);
|
||||
int b = super.read();
|
||||
eventHooks.afterFileIo(volume, READ, begin, 1);
|
||||
profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT);
|
||||
return b;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, READ, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, READ, len);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, READ, len);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, READ, len);
|
||||
int numBytesRead = super.read(b, off, len);
|
||||
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
return numBytesRead;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, READ, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, READ, b
|
||||
.length);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, READ, b.length);
|
||||
int numBytesRead = super.read(b);
|
||||
eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
|
||||
return numBytesRead;
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, READ, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, WRITE,
|
||||
LEN_INT);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT);
|
||||
super.write(b);
|
||||
eventHooks.afterFileIo(volume, WRITE, begin, 1);
|
||||
profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, WRITE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(@Nonnull byte[] b) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, WRITE, b
|
||||
.length);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length);
|
||||
super.write(b);
|
||||
eventHooks.afterFileIo(volume, WRITE, begin, b.length);
|
||||
profilingEventHook.afterFileIo(volume, WRITE, begin, b.length);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, WRITE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException {
|
||||
final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
|
||||
final long begin = profilingEventHook.beforeFileIo(volume, WRITE, len);
|
||||
try {
|
||||
faultInjectorEventHook.beforeFileIo(volume, WRITE, len);
|
||||
super.write(b, off, len);
|
||||
eventHooks.afterFileIo(volume, WRITE, begin, len);
|
||||
profilingEventHook.afterFileIo(volume, WRITE, begin, len);
|
||||
} catch(Exception e) {
|
||||
eventHooks.onFailure(datanode, volume, WRITE, e, begin);
|
||||
onFailure(volume, begin);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void onFailure(@Nullable FsVolumeSpi volume, long begin) {
|
||||
if (datanode != null && volume != null) {
|
||||
datanode.checkDiskErrorAsync(volume);
|
||||
}
|
||||
profilingEventHook.onFailure(volume, begin);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -26,84 +28,96 @@ import org.apache.hadoop.util.Time;
|
|||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* {@link FileIoEvents} that profiles the performance of the metadata and data
|
||||
* related operations on datanode volumes.
|
||||
* Profiles the performance of the metadata and data related operations on
|
||||
* datanode volumes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ProfilingFileIoEvents extends FileIoEvents {
|
||||
class ProfilingFileIoEvents {
|
||||
|
||||
private final boolean isEnabled;
|
||||
|
||||
public ProfilingFileIoEvents(@Nullable Configuration conf) {
|
||||
if (conf != null) {
|
||||
isEnabled = conf.getBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY, DFSConfigKeys
|
||||
.DFS_DATANODE_ENABLE_FILEIO_PROFILING_DEFAULT);
|
||||
} else {
|
||||
isEnabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long beforeMetadataOp(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
return Time.monotonicNow();
|
||||
if (isEnabled) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
return Time.monotonicNow();
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterMetadataOp(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long begin) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
metrics.addMetadastaOperationLatency(Time.monotonicNow() - begin);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long beforeFileIo(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long len) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
return Time.monotonicNow();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterFileIo(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long begin, long len) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
long latency = Time.monotonicNow() - begin;
|
||||
metrics.addDataFileIoLatency(latency);
|
||||
switch (op) {
|
||||
case SYNC:
|
||||
metrics.addSyncIoLatency(latency);
|
||||
break;
|
||||
case FLUSH:
|
||||
metrics.addFlushIoLatency(latency);
|
||||
break;
|
||||
case READ:
|
||||
metrics.addReadIoLatency(latency);
|
||||
break;
|
||||
case WRITE:
|
||||
metrics.addWriteIoLatency(latency);
|
||||
break;
|
||||
default:
|
||||
if (isEnabled) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
metrics.addMetadastaOperationLatency(Time.monotonicNow() - begin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, Exception e, long begin) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
metrics.addFileIoError(Time.monotonicNow() - begin);
|
||||
public long beforeFileIo(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long len) {
|
||||
if (isEnabled) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
return Time.monotonicNow();
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void afterFileIo(@Nullable FsVolumeSpi volume,
|
||||
FileIoProvider.OPERATION op, long begin, long len) {
|
||||
if (isEnabled) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
long latency = Time.monotonicNow() - begin;
|
||||
metrics.addDataFileIoLatency(latency);
|
||||
switch (op) {
|
||||
case SYNC:
|
||||
metrics.addSyncIoLatency(latency);
|
||||
break;
|
||||
case FLUSH:
|
||||
metrics.addFlushIoLatency(latency);
|
||||
break;
|
||||
case READ:
|
||||
metrics.addReadIoLatency(latency);
|
||||
break;
|
||||
case WRITE:
|
||||
metrics.addWriteIoLatency(latency);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getStatistics() {
|
||||
return null;
|
||||
public void onFailure(@Nullable FsVolumeSpi volume, long begin) {
|
||||
if (isEnabled) {
|
||||
DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
|
||||
if (metrics != null) {
|
||||
metrics.addFileIoError(Time.monotonicNow() - begin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private DataNodeVolumeMetrics getVolumeMetrics(final FsVolumeSpi volume) {
|
||||
if (volume != null) {
|
||||
return volume.getMetrics();
|
||||
if (isEnabled) {
|
||||
if (volume != null) {
|
||||
return volume.getMetrics();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -121,8 +121,8 @@ public class TestDataNodeVolumeMetrics {
|
|||
|
||||
private MiniDFSCluster setupClusterForVolumeMetrics() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
|
||||
"org.apache.hadoop.hdfs.server.datanode.ProfilingFileIoEvents");
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY, true);
|
||||
SimulatedFSDataset.setFactory(conf);
|
||||
return new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(NUM_DATANODES)
|
||||
|
|
|
@ -100,7 +100,9 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
|
|||
configurationPropsToSkipCompare
|
||||
.add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY);
|
||||
configurationPropsToSkipCompare
|
||||
.add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY);
|
||||
.add(DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY);
|
||||
configurationPropsToSkipCompare.add(DFSConfigKeys
|
||||
.DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY);
|
||||
|
||||
// Allocate
|
||||
xmlPropsToSkipCompare = new HashSet<String>();
|
||||
|
|
Loading…
Reference in New Issue