HDFS-10958. Add instrumentation hooks around Datanode disk IO.

This commit is contained in:
Arpit Agarwal 2016-12-14 11:18:58 -08:00
parent 72bff192cd
commit 6ba9587d37
36 changed files with 1685 additions and 366 deletions

View File

@ -742,47 +742,19 @@ public class NativeIO {
}
/**
* Create a FileInputStream that shares delete permission on the
* file opened, i.e. other process can delete the file the
* FileInputStream is reading. Only Windows implementation uses
* the native interface.
*/
public static FileInputStream getShareDeleteFileInputStream(File f)
throws IOException {
if (!Shell.WINDOWS) {
// On Linux the default FileInputStream shares delete permission
// on the file opened.
//
return new FileInputStream(f);
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened.
//
FileDescriptor fd = Windows.createFile(
f.getAbsolutePath(),
Windows.GENERIC_READ,
Windows.FILE_SHARE_READ |
Windows.FILE_SHARE_WRITE |
Windows.FILE_SHARE_DELETE,
Windows.OPEN_EXISTING);
return new FileInputStream(fd);
}
}
/**
* Create a FileInputStream that shares delete permission on the
* Create a FileDescriptor that shares delete permission on the
* file opened at a given offset, i.e. other process can delete
* the file the FileInputStream is reading. Only Windows implementation
* the file the FileDescriptor is reading. Only Windows implementation
* uses the native interface.
*/
public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
throws IOException {
public static FileDescriptor getShareDeleteFileDescriptor(
File f, long seekOffset) throws IOException {
if (!Shell.WINDOWS) {
RandomAccessFile rf = new RandomAccessFile(f, "r");
if (seekOffset > 0) {
rf.seek(seekOffset);
}
return new FileInputStream(rf.getFD());
return rf.getFD();
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened, and set it to the
@ -797,7 +769,7 @@ public class NativeIO {
NativeIO.Windows.OPEN_EXISTING);
if (seekOffset > 0)
NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
return new FileInputStream(fd);
return fd;
}
}

View File

@ -31,7 +31,6 @@ import java.nio.channels.FileChannel;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
@ -79,18 +78,15 @@ public class BlockMetadataHeader {
/**
* Read the checksum header from the meta file.
* inputStream must be closed by the caller.
* @return the data checksum obtained from the header.
*/
public static DataChecksum readDataChecksum(File metaFile, int bufSize)
public static DataChecksum readDataChecksum(
FileInputStream inputStream, int bufSize, File metaFile)
throws IOException {
DataInputStream in = null;
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(metaFile), bufSize));
return readDataChecksum(in, metaFile);
} finally {
IOUtils.closeStream(in);
}
DataInputStream in = new DataInputStream(new BufferedInputStream(
inputStream, bufSize));
return readDataChecksum(in, metaFile);
}
/**
@ -111,6 +107,7 @@ public class BlockMetadataHeader {
/**
* Read the header without changing the position of the FileChannel.
* This is used by the client for short-circuit reads.
*
* @param fc The FileChannel to read.
* @return the Metadata Header.
@ -144,18 +141,16 @@ public class BlockMetadataHeader {
/**
* Reads header at the top of metadata file and returns the header.
* Closes the input stream after reading the header.
*
* @return metadata header for the block
* @throws IOException
*/
public static BlockMetadataHeader readHeader(File file) throws IOException {
DataInputStream in = null;
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(file)));
public static BlockMetadataHeader readHeader(
FileInputStream fis) throws IOException {
try (DataInputStream in = new DataInputStream(
new BufferedInputStream(fis))) {
return readHeader(in);
} finally {
IOUtils.closeStream(in);
}
}

View File

@ -73,6 +73,33 @@
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!--
This class exposes stream constructors. The newly created streams are not
supposed to be closed in the constructor. Ignore the OBL warning.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileOutputStream" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!--
This class exposes stream constructors. The newly created streams are not
supposed to be closed in the constructor. Ignore the OBL warning.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileInputStream" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!--
This class exposes stream constructors. The newly created streams are not
supposed to be closed in the constructor. Ignore the OBL warning.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedRandomAccessFile" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!--
lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
See the comments in BackupImage for justification.

View File

@ -687,6 +687,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY =
"dfs.datanode.fileio.events.class";
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";

View File

@ -244,8 +244,7 @@ class BlockReceiver implements Closeable {
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate, requestedChecksum,
datanodeSlowLogThresholdMs);
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
// read checksum meta information
@ -400,9 +399,8 @@ class BlockReceiver implements Closeable {
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (isSync) {
long fsyncStartNanos = flushEndNanos;
streams.syncChecksumOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
datanode.metrics.addFsyncNanos(System.nanoTime() - flushEndNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
@ -703,8 +701,10 @@ class BlockReceiver implements Closeable {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
long duration = streams.writeToDisk(dataBuf.array(),
long begin = Time.monotonicNow();
streams.writeDataToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
if (duration > maxWriteToDiskMs) {
maxWriteToDiskMs = duration;
@ -1029,9 +1029,7 @@ class BlockReceiver implements Closeable {
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
if (streams.getDataOut() != null) {
streams.flushDataOut();
}
streams.flushDataOut();
if (checksumOut != null) {
checksumOut.flush();
}

View File

@ -166,6 +166,7 @@ class BlockSender implements java.io.Closeable {
private final boolean dropCacheBehindAllReads;
private long lastCacheDropOffset;
private final FileIoProvider fileIoProvider;
@VisibleForTesting
static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@ -197,6 +198,7 @@ class BlockSender implements java.io.Closeable {
InputStream blockIn = null;
DataInputStream checksumIn = null;
FsVolumeReference volumeRef = null;
this.fileIoProvider = datanode.getFileIoProvider();
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
@ -401,7 +403,8 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
ris = new ReplicaInputStreams(
blockIn, checksumIn, volumeRef, fileIoProvider);
} catch (IOException ioe) {
IOUtils.closeStream(this);
throw ioe;
@ -568,8 +571,9 @@ class BlockSender implements java.io.Closeable {
FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
fileIoProvider.transferToSocketFully(
ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition,
dataLen, waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* {@link FileIoEvents} that simply counts the number of operations.
* Not meant to be used outside of testing.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CountingFileIoEvents implements FileIoEvents {
private final Map<OPERATION, Counts> counts;
private static class Counts {
private final AtomicLong successes = new AtomicLong(0);
private final AtomicLong failures = new AtomicLong(0);
@JsonProperty("Successes")
public long getSuccesses() {
return successes.get();
}
@JsonProperty("Failures")
public long getFailures() {
return failures.get();
}
}
public CountingFileIoEvents() {
counts = new HashMap<>();
for (OPERATION op : OPERATION.values()) {
counts.put(op, new Counts());
}
}
@Override
public long beforeMetadataOp(
@Nullable FsVolumeSpi volume, OPERATION op) {
return 0;
}
@Override
public void afterMetadataOp(
@Nullable FsVolumeSpi volume, OPERATION op, long begin) {
counts.get(op).successes.incrementAndGet();
}
@Override
public long beforeFileIo(
@Nullable FsVolumeSpi volume, OPERATION op, long len) {
return 0;
}
@Override
public void afterFileIo(
@Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
counts.get(op).successes.incrementAndGet();
}
@Override
public void onFailure(
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
counts.get(op).failures.incrementAndGet();
}
@Override
public String getStatistics() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(counts);
} catch (JsonProcessingException e) {
// Failed to serialize. Don't log the exception call stack.
FileIoProvider.LOG.error("Failed to serialize statistics" + e);
return null;
}
}
}

View File

@ -299,6 +299,7 @@ public class DataNode extends ReconfigurableBase
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
private final FileIoProvider fileIoProvider;
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
@ -411,6 +412,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
this.fileIoProvider = new FileIoProvider(conf);
this.fileDescriptorPassingDisabledReason = null;
this.maxNumberOfBlocksToLog = 0;
this.confVersion = null;
@ -437,6 +439,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
this.fileIoProvider = new FileIoProvider(conf);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@ -617,6 +620,10 @@ public class DataNode extends ReconfigurableBase
PipelineAck.ECN.SUPPORTED;
}
public FileIoProvider getFileIoProvider() {
return fileIoProvider;
}
/**
* Contains the StorageLocations for changed data volumes.
*/
@ -3008,6 +3015,11 @@ public class DataNode extends ReconfigurableBase
}
}
@Override // DataNodeMXBean
public String getFileIoProviderStatistics() {
return fileIoProvider.getStatistics();
}
public void refreshNamenodes(Configuration conf) throws IOException {
blockPoolManager.refreshNamenodes(conf);
}

View File

@ -120,4 +120,9 @@ public interface DataNodeMXBean {
* @return DiskBalancer Status
*/
String getDiskBalancerStatus();
/**
* Gets the {@link FileIoProvider} statistics.
*/
String getFileIoProviderStatistics();
}

View File

@ -1356,6 +1356,12 @@ public class DataStorage extends Storage {
bpStorageMap.remove(bpId);
}
/**
* Prefer FileIoProvider#fullydelete.
* @param dir
* @return
*/
@Deprecated
public static boolean fullyDelete(final File dir) {
boolean result = FileUtil.fullyDelete(dir);
return result;

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
/** Provide utility methods for Datanode. */
@ -55,15 +56,17 @@ public class DatanodeUtil {
* @throws IOException
* if the file already exists or if the file cannot be created.
*/
public static File createTmpFile(Block b, File f) throws IOException {
if (f.exists()) {
public static File createFileWithExistsCheck(
FsVolumeSpi volume, Block b, File f,
FileIoProvider fileIoProvider) throws IOException {
if (fileIoProvider.exists(volume, f)) {
throw new IOException("Failed to create temporary file for " + b
+ ". File " + f + " should not be present, but is.");
}
// Create the zero-length temp file
final boolean fileCreated;
try {
fileCreated = f.createNewFile();
fileCreated = fileIoProvider.createFile(volume, f);
} catch (IOException ioe) {
throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
}
@ -92,13 +95,17 @@ public class DatanodeUtil {
* @return true if there are no files
* @throws IOException if unable to list subdirectories
*/
public static boolean dirNoFilesRecursive(File dir) throws IOException {
File[] contents = dir.listFiles();
public static boolean dirNoFilesRecursive(
FsVolumeSpi volume, File dir,
FileIoProvider fileIoProvider) throws IOException {
File[] contents = fileIoProvider.listFiles(volume, dir);
if (contents == null) {
throw new IOException("Cannot list contents of " + dir);
}
for (File f : contents) {
if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
if (!f.isDirectory() ||
(f.isDirectory() && !dirNoFilesRecursive(
volume, f, fileIoProvider))) {
return false;
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import javax.annotation.Nullable;
/**
* The default implementation of {@link FileIoEvents} that do nothing.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class DefaultFileIoEvents implements FileIoEvents {
@Override
public long beforeMetadataOp(
@Nullable FsVolumeSpi volume, OPERATION op) {
return 0;
}
@Override
public void afterMetadataOp(
@Nullable FsVolumeSpi volume, OPERATION op, long begin) {
}
@Override
public long beforeFileIo(
@Nullable FsVolumeSpi volume, OPERATION op, long len) {
return 0;
}
@Override
public void afterFileIo(
@Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
}
@Override
public void onFailure(
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
}
@Override
public @Nullable String getStatistics() {
// null is valid JSON.
return null;
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import javax.annotation.Nullable;
/**
* The following hooks can be implemented for instrumentation/fault
* injection.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface FileIoEvents {
/**
* Invoked before a filesystem metadata operation.
*
* @param volume target volume for the operation. Null if unavailable.
* @param op type of operation.
* @return timestamp at which the operation was started. 0 if
* unavailable.
*/
long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
/**
* Invoked after a filesystem metadata operation has completed.
*
* @param volume target volume for the operation. Null if unavailable.
* @param op type of operation.
* @param begin timestamp at which the operation was started. 0
* if unavailable.
*/
void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
/**
* Invoked before a read/write/flush/channel transfer operation.
*
* @param volume target volume for the operation. Null if unavailable.
* @param op type of operation.
* @param len length of the file IO. 0 for flush.
* @return timestamp at which the operation was started. 0 if
* unavailable.
*/
long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
/**
* Invoked after a read/write/flush/channel transfer operation
* has completed.
*
* @param volume target volume for the operation. Null if unavailable.
* @param op type of operation.
* @param len of the file IO. 0 for flush.
* @return timestamp at which the operation was started. 0 if
* unavailable.
*/
void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
long begin, long len);
/**
* Invoked if an operation fails with an exception.
* @param volume target volume for the operation. Null if unavailable.
* @param op type of operation.
* @param e Exception encountered during the operation.
* @param begin time at which the operation was started.
*/
void onFailure(
@Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
/**
* Return statistics as a JSON string.
* @return
*/
@Nullable String getStatistics();
}

View File

@ -29,17 +29,13 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
@ -187,20 +183,23 @@ abstract public class LocalReplica extends ReplicaInfo {
* be recovered (especially on Windows) on datanode restart.
*/
private void breakHardlinks(File file, Block b) throws IOException {
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
try (FileInputStream in = new FileInputStream(file)) {
try (FileOutputStream out = new FileOutputStream(tmpFile)){
copyBytes(in, out, 16 * 1024);
final FileIoProvider fileIoProvider = getFileIoProvider();
final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
try (FileInputStream in = fileIoProvider.getFileInputStream(
getVolume(), file)) {
try (FileOutputStream out = fileIoProvider.getFileOutputStream(
getVolume(), tmpFile)) {
IOUtils.copyBytes(in, out, 16 * 1024);
}
if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()+
" into file " + tmpFile +
" resulted in a size of " + tmpFile.length());
}
replaceFile(tmpFile, file);
fileIoProvider.replaceFile(getVolume(), tmpFile, file);
} catch (IOException e) {
boolean done = tmpFile.delete();
if (!done) {
if (!fileIoProvider.delete(getVolume(), tmpFile)) {
DataNode.LOG.info("detachFile failed to delete temporary file " +
tmpFile);
}
@ -226,19 +225,20 @@ abstract public class LocalReplica extends ReplicaInfo {
* @throws IOException
*/
public boolean breakHardLinksIfNeeded() throws IOException {
File file = getBlockFile();
final File file = getBlockFile();
final FileIoProvider fileIoProvider = getFileIoProvider();
if (file == null || getVolume() == null) {
throw new IOException("detachBlock:Block not found. " + this);
}
File meta = getMetaFile();
int linkCount = getHardLinkCount(file);
int linkCount = fileIoProvider.getHardLinkCount(getVolume(), file);
if (linkCount > 1) {
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
"block " + this);
breakHardlinks(file, this);
}
if (getHardLinkCount(meta) > 1) {
if (fileIoProvider.getHardLinkCount(getVolume(), meta) > 1) {
breakHardlinks(meta, this);
}
return true;
@ -256,17 +256,18 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public OutputStream getDataOutputStream(boolean append) throws IOException {
return new FileOutputStream(getBlockFile(), append);
return getFileIoProvider().getFileOutputStream(
getVolume(), getBlockFile(), append);
}
@Override
public boolean blockDataExists() {
return getBlockFile().exists();
return getFileIoProvider().exists(getVolume(), getBlockFile());
}
@Override
public boolean deleteBlockData() {
return fullyDelete(getBlockFile());
return getFileIoProvider().fullyDelete(getVolume(), getBlockFile());
}
@Override
@ -282,9 +283,10 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public LengthInputStream getMetadataInputStream(long offset)
throws IOException {
File meta = getMetaFile();
final File meta = getMetaFile();
return new LengthInputStream(
FsDatasetUtil.openAndSeek(meta, offset), meta.length());
getFileIoProvider().openAndSeek(getVolume(), meta, offset),
meta.length());
}
@Override
@ -295,12 +297,12 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public boolean metadataExists() {
return getMetaFile().exists();
return getFileIoProvider().exists(getVolume(), getMetaFile());
}
@Override
public boolean deleteMetadata() {
return fullyDelete(getMetaFile());
return getFileIoProvider().fullyDelete(getVolume(), getMetaFile());
}
@Override
@ -320,7 +322,7 @@ abstract public class LocalReplica extends ReplicaInfo {
private boolean renameFile(File srcfile, File destfile) throws IOException {
try {
rename(srcfile, destfile);
getFileIoProvider().rename(getVolume(), srcfile, destfile);
return true;
} catch (IOException e) {
throw new IOException("Failed to move block file for " + this
@ -360,9 +362,9 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void bumpReplicaGS(long newGS) throws IOException {
long oldGS = getGenerationStamp();
File oldmeta = getMetaFile();
final File oldmeta = getMetaFile();
setGenerationStamp(newGS);
File newmeta = getMetaFile();
final File newmeta = getMetaFile();
// rename meta file to new GS
if (LOG.isDebugEnabled()) {
@ -370,7 +372,7 @@ abstract public class LocalReplica extends ReplicaInfo {
}
try {
// calling renameMeta on the ReplicaInfo doesn't work here
rename(oldmeta, newmeta);
getFileIoProvider().rename(getVolume(), oldmeta, newmeta);
} catch (IOException e) {
setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + this + " reopen failed. " +
@ -381,7 +383,8 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void truncateBlock(long newLength) throws IOException {
truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength);
truncateBlock(getVolume(), getBlockFile(), getMetaFile(),
getNumBytes(), newLength, getFileIoProvider());
}
@Override
@ -392,32 +395,15 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void copyMetadata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true);
getFileIoProvider().nativeCopyFileUnbuffered(
getVolume(), getMetaFile(), new File(destination), true);
}
@Override
public void copyBlockdata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true);
}
public void renameMeta(File newMetaFile) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile);
}
renameFile(getMetaFile(), newMetaFile);
}
public void renameBlock(File newBlockFile) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile
+ ", file length=" + getBlockFile().length());
}
renameFile(getBlockFile(), newBlockFile);
}
public static void rename(File from, File to) throws IOException {
Storage.rename(from, to);
getFileIoProvider().nativeCopyFileUnbuffered(
getVolume(), getBlockFile(), new File(destination), true);
}
/**
@ -430,11 +416,13 @@ abstract public class LocalReplica extends ReplicaInfo {
private FileInputStream getDataInputStream(File f, long seekOffset)
throws IOException {
FileInputStream fis;
final FileIoProvider fileIoProvider = getFileIoProvider();
if (NativeIO.isAvailable()) {
fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset);
fis = fileIoProvider.getShareDeleteFileInputStream(
getVolume(), f, seekOffset);
} else {
try {
fis = FsDatasetUtil.openAndSeek(f, seekOffset);
fis = fileIoProvider.openAndSeek(getVolume(), f, seekOffset);
} catch (FileNotFoundException fnfe) {
throw new IOException("Expected block file at " + f +
" does not exist.");
@ -443,30 +431,6 @@ abstract public class LocalReplica extends ReplicaInfo {
return fis;
}
private void nativeCopyFileUnbuffered(File srcFile, File destFile,
boolean preserveFileDate) throws IOException {
Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate);
}
private void copyBytes(InputStream in, OutputStream out, int
buffSize) throws IOException{
IOUtils.copyBytes(in, out, buffSize);
}
private void replaceFile(File src, File target) throws IOException {
FileUtil.replaceFile(src, target);
}
public static boolean fullyDelete(final File dir) {
boolean result = DataStorage.fullyDelete(dir);
return result;
}
public static int getHardLinkCount(File fileName) throws IOException {
int linkCount = HardLink.getLinkCount(fileName);
return linkCount;
}
/**
* Get pin status of a file by checking the sticky bit.
* @param localFS local file system
@ -495,8 +459,10 @@ abstract public class LocalReplica extends ReplicaInfo {
localFS.setPermission(path, permission);
}
public static void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
public static void truncateBlock(
FsVolumeSpi volume, File blockFile, File metaFile,
long oldlen, long newlen, FileIoProvider fileIoProvider)
throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
+ ", metaFile=" + metaFile
+ ", oldlen=" + oldlen
@ -510,7 +476,10 @@ abstract public class LocalReplica extends ReplicaInfo {
+ ") to newlen (=" + newlen + ")");
}
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
// fis is closed by BlockMetadataHeader.readHeader.
final FileInputStream fis = fileIoProvider.getFileInputStream(
volume, metaFile);
DataChecksum dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
int checksumsize = dcs.getChecksumSize();
int bpc = dcs.getBytesPerChecksum();
long n = (newlen - 1)/bpc + 1;
@ -519,16 +488,14 @@ abstract public class LocalReplica extends ReplicaInfo {
int lastchunksize = (int)(newlen - lastchunkoffset);
byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
try {
try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
volume, blockFile, "rw")) {
//truncate blockFile
blockRAF.setLength(newlen);
//read last chunk
blockRAF.seek(lastchunkoffset);
blockRAF.readFully(b, 0, lastchunksize);
} finally {
blockRAF.close();
}
//compute checksum
@ -536,13 +503,11 @@ abstract public class LocalReplica extends ReplicaInfo {
dcs.writeValue(b, 0, false);
//update metaFile
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
try {
try (RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile(
volume, metaFile, "rw")) {
metaRAF.setLength(newmetalen);
metaRAF.seek(newmetalen - checksumsize);
metaRAF.write(b, 0, checksumsize);
} finally {
metaRAF.close();
}
}
}

View File

@ -245,10 +245,9 @@ public class LocalReplicaInPipeline extends LocalReplica
@Override // ReplicaInPipeline
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException {
File blockFile = getBlockFile();
File metaFile = getMetaFile();
DataChecksum requestedChecksum) throws IOException {
final File blockFile = getBlockFile();
final File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("writeTo blockfile is " + blockFile +
" of size " + blockFile.length());
@ -262,14 +261,16 @@ public class LocalReplicaInPipeline extends LocalReplica
// may differ from requestedChecksum for appends.
final DataChecksum checksum;
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
final RandomAccessFile metaRAF =
getFileIoProvider().getRandomAccessFile(getVolume(), metaFile, "rw");
if (!isCreate) {
// For append or recovery, we must enforce the existing checksum.
// Also, verify that the file has correct lengths, etc.
boolean checkedMeta = false;
try {
BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
BlockMetadataHeader header =
BlockMetadataHeader.readHeader(metaRAF);
checksum = header.getChecksum();
if (checksum.getBytesPerChecksum() !=
@ -302,20 +303,24 @@ public class LocalReplicaInPipeline extends LocalReplica
checksum = requestedChecksum;
}
final FileIoProvider fileIoProvider = getFileIoProvider();
FileOutputStream blockOut = null;
FileOutputStream crcOut = null;
try {
blockOut = new FileOutputStream(
new RandomAccessFile(blockFile, "rw").getFD());
crcOut = new FileOutputStream(metaRAF.getFD());
blockOut = fileIoProvider.getFileOutputStream(
getVolume(),
fileIoProvider.getRandomAccessFile(getVolume(), blockFile, "rw")
.getFD());
crcOut = fileIoProvider.getFileOutputStream(getVolume(), metaRAF.getFD());
if (!isCreate) {
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
return new ReplicaOutputStreams(blockOut, crcOut, checksum,
getVolume().isTransientStorage(), slowLogThresholdMs);
getVolume(), fileIoProvider);
} catch (IOException e) {
IOUtils.closeStream(blockOut);
IOUtils.closeStream(crcOut);
IOUtils.closeStream(metaRAF);
throw e;
}
@ -326,11 +331,11 @@ public class LocalReplicaInPipeline extends LocalReplica
File blockFile = getBlockFile();
File restartMeta = new File(blockFile.getParent() +
File.pathSeparator + "." + blockFile.getName() + ".restart");
if (restartMeta.exists() && !restartMeta.delete()) {
if (!getFileIoProvider().deleteWithExistsCheck(getVolume(), restartMeta)) {
DataNode.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
return new FileOutputStream(restartMeta);
return getFileIoProvider().getFileOutputStream(getVolume(), restartMeta);
}
@Override
@ -373,12 +378,14 @@ public class LocalReplicaInPipeline extends LocalReplica
+ " should be derived from LocalReplica");
}
LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
File oldmeta = oldReplica.getMetaFile();
File newmeta = getMetaFile();
final LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
final File oldBlockFile = oldReplica.getBlockFile();
final File oldmeta = oldReplica.getMetaFile();
final File newmeta = getMetaFile();
final FileIoProvider fileIoProvider = getFileIoProvider();
try {
oldReplica.renameMeta(newmeta);
fileIoProvider.rename(getVolume(), oldmeta, newmeta);
} catch (IOException e) {
throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
@ -386,10 +393,10 @@ public class LocalReplicaInPipeline extends LocalReplica
}
try {
oldReplica.renameBlock(newBlkFile);
fileIoProvider.rename(getVolume(), oldBlockFile, newBlkFile);
} catch (IOException e) {
try {
renameMeta(oldmeta);
fileIoProvider.rename(getVolume(), newmeta, oldmeta);
} catch (IOException ex) {
LOG.warn("Cannot move meta file " + newmeta +
"back to the finalized directory " + oldmeta, ex);

View File

@ -69,13 +69,11 @@ public interface ReplicaInPipeline extends Replica {
*
* @param isCreate if it is for creation
* @param requestedChecksum the checksum the writer would prefer to use
* @param slowLogThresholdMs slow io threshold for logging
* @return output streams for writing
* @throws IOException if any error occurs
*/
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException;
DataChecksum requestedChecksum) throws IOException;
/**
* Create an output stream to write restart metadata in case of datanode

View File

@ -45,6 +45,10 @@ abstract public class ReplicaInfo extends Block
/** volume where the replica belongs. */
private FsVolumeSpi volume;
/** This is used by some tests and FsDatasetUtil#computeChecksum. */
private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
new FileIoProvider(null);
/**
* Constructor
* @param vol volume where replica is located
@ -64,7 +68,18 @@ abstract public class ReplicaInfo extends Block
public FsVolumeSpi getVolume() {
return volume;
}
/**
* Get the {@link FileIoProvider} for disk IO operations.
*/
public FileIoProvider getFileIoProvider() {
// In tests and when invoked via FsDatasetUtil#computeChecksum, the
// target volume for this replica may be unknown and hence null.
// Use the DEFAULT_FILE_IO_PROVIDER with no-op hooks.
return (volume != null) ? volume.getFileIoProvider()
: DEFAULT_FILE_IO_PROVIDER;
}
/**
* Set the volume where this replica is located on disk.
*/

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
@ -418,4 +419,6 @@ public interface FsVolumeSpi
*/
class VolumeCheckContext {
}
FileIoProvider getFileIoProvider();
}

View File

@ -24,8 +24,8 @@ import java.io.InputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.slf4j.Logger;
@ -38,12 +38,15 @@ public class ReplicaInputStreams implements Closeable {
private InputStream dataIn;
private InputStream checksumIn;
private FsVolumeReference volumeRef;
private final FileIoProvider fileIoProvider;
private FileDescriptor dataInFd = null;
/** Create an object with a data input stream and a checksum input stream. */
public ReplicaInputStreams(InputStream dataStream,
InputStream checksumStream, FsVolumeReference volumeRef) {
public ReplicaInputStreams(
InputStream dataStream, InputStream checksumStream,
FsVolumeReference volumeRef, FileIoProvider fileIoProvider) {
this.volumeRef = volumeRef;
this.fileIoProvider = fileIoProvider;
this.dataIn = dataStream;
this.checksumIn = checksumStream;
if (dataIn instanceof FileInputStream) {
@ -103,7 +106,7 @@ public class ReplicaInputStreams implements Closeable {
public void dropCacheBehindReads(String identifier, long offset, long len,
int flags) throws NativeIOException {
assert this.dataInFd != null : "null dataInFd!";
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
fileIoProvider.posixFadvise(getVolumeRef().getVolume(),
identifier, dataInFd, offset, len, flags);
}

View File

@ -24,11 +24,10 @@ import java.io.OutputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
/**
@ -43,21 +42,22 @@ public class ReplicaOutputStreams implements Closeable {
/** Stream to checksum. */
private final OutputStream checksumOut;
private final DataChecksum checksum;
private final boolean isTransientStorage;
private final long slowLogThresholdMs;
private final FsVolumeSpi volume;
private final FileIoProvider fileIoProvider;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
public ReplicaOutputStreams(OutputStream dataOut,
OutputStream checksumOut, DataChecksum checksum,
boolean isTransientStorage, long slowLogThresholdMs) {
public ReplicaOutputStreams(
OutputStream dataOut, OutputStream checksumOut, DataChecksum checksum,
FsVolumeSpi volume, FileIoProvider fileIoProvider) {
this.dataOut = dataOut;
this.checksum = checksum;
this.slowLogThresholdMs = slowLogThresholdMs;
this.isTransientStorage = isTransientStorage;
this.checksumOut = checksumOut;
this.volume = volume;
this.fileIoProvider = fileIoProvider;
try {
if (this.dataOut instanceof FileOutputStream) {
@ -93,7 +93,7 @@ public class ReplicaOutputStreams implements Closeable {
/** @return is writing to a transient storage? */
public boolean isTransientStorage() {
return isTransientStorage;
return volume.isTransientStorage();
}
@Override
@ -112,7 +112,7 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) {
sync((FileOutputStream)dataOut);
fileIoProvider.sync(volume, (FileOutputStream) dataOut);
}
}
@ -121,7 +121,7 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) {
sync((FileOutputStream)checksumOut);
fileIoProvider.sync(volume, (FileOutputStream) checksumOut);
}
}
@ -129,60 +129,34 @@ public class ReplicaOutputStreams implements Closeable {
* Flush the data stream if it supports it.
*/
public void flushDataOut() throws IOException {
flush(dataOut);
if (dataOut != null) {
fileIoProvider.flush(volume, dataOut);
}
}
/**
* Flush the checksum stream if it supports it.
*/
public void flushChecksumOut() throws IOException {
flush(checksumOut);
}
private void flush(OutputStream dos) throws IOException {
long begin = Time.monotonicNow();
dos.flush();
long duration = Time.monotonicNow() - begin;
LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
slowLogThresholdMs);
if (checksumOut != null) {
fileIoProvider.flush(volume, checksumOut);
}
}
private void sync(FileOutputStream fos) throws IOException {
long begin = Time.monotonicNow();
fos.getChannel().force(true);
long duration = Time.monotonicNow() - begin;
LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
slowLogThresholdMs);
}
}
public long writeToDisk(byte[] b, int off, int len) throws IOException {
long begin = Time.monotonicNow();
public void writeDataToDisk(byte[] b, int off, int len)
throws IOException {
dataOut.write(b, off, len);
long duration = Time.monotonicNow() - begin;
LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
"(threshold={} ms)", duration, slowLogThresholdMs);
}
return duration;
}
public void syncFileRangeIfPossible(long offset, long nbytes,
int flags) throws NativeIOException {
assert this.outFd != null : "null outFd!";
NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
fileIoProvider.syncFileRange(
volume, outFd, offset, nbytes, flags);
}
public void dropCacheBehindWrites(String identifier,
long offset, long len, int flags) throws NativeIOException {
assert this.outFd != null : "null outFd!";
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
identifier, outFd, offset, len, flags);
fileIoProvider.posixFadvise(
volume, identifier, outFd, offset, len, flags);
}
}

View File

@ -32,13 +32,11 @@ import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
@ -46,10 +44,10 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@ -64,7 +62,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
/**
* A block pool slice represents a portion of a block pool stored on a volume.
@ -96,6 +93,7 @@ class BlockPoolSlice {
private final long cachedDfsUsedCheckTime;
private final Timer timer;
private final int maxDataLength;
private final FileIoProvider fileIoProvider;
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
@ -113,6 +111,7 @@ class BlockPoolSlice {
Configuration conf, Timer timer) throws IOException {
this.bpid = bpid;
this.volume = volume;
this.fileIoProvider = volume.getFileIoProvider();
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
@ -147,19 +146,14 @@ class BlockPoolSlice {
//
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
if (tmpDir.exists()) {
DataStorage.fullyDelete(tmpDir);
fileIoProvider.fullyDelete(volume, tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
if (!rbwDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
}
}
if (!tmpDir.mkdirs()) {
if (!tmpDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
}
}
// create the rbw and tmp directories if they don't exist.
fileIoProvider.mkdirs(volume, rbwDir);
fileIoProvider.mkdirs(volume, tmpDir);
// Use cached value initially if available. Or the following call will
// block until the initial du command completes.
this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
@ -266,7 +260,7 @@ class BlockPoolSlice {
*/
void saveDfsUsed() {
File outFile = new File(currentDir, DU_CACHE_FILE);
if (outFile.exists() && !outFile.delete()) {
if (!fileIoProvider.deleteWithExistsCheck(volume, outFile)) {
FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " +
outFile.getParent());
}
@ -277,7 +271,7 @@ class BlockPoolSlice {
new FileOutputStream(outFile), "UTF-8")) {
// mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(timer.now()));
out.flush();
fileIoProvider.flush(volume, out);
}
} catch (IOException ioe) {
// If write failed, the volume might be bad. Since the cache file is
@ -292,7 +286,8 @@ class BlockPoolSlice {
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
File tmpFile = DatanodeUtil.createTmpFile(b, f);
File tmpFile = DatanodeUtil.createFileWithExistsCheck(
volume, b, f, fileIoProvider);
// If any exception during creation, its expected that counter will not be
// incremented, So no need to decrement
incrNumBlocks();
@ -305,7 +300,8 @@ class BlockPoolSlice {
*/
File createRbwFile(Block b) throws IOException {
File f = new File(rbwDir, b.getBlockName());
File rbwFile = DatanodeUtil.createTmpFile(b, f);
File rbwFile = DatanodeUtil.createFileWithExistsCheck(
volume, b, f, fileIoProvider);
// If any exception during creation, its expected that counter will not be
// incremented, So no need to decrement
incrNumBlocks();
@ -314,11 +310,7 @@ class BlockPoolSlice {
File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException {
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
if (!blockDir.exists()) {
if (!blockDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + blockDir);
}
}
fileIoProvider.mkdirsWithExistsCheck(volume, blockDir);
File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
if (dfsUsage instanceof CachingGetSpaceUsed) {
@ -340,9 +332,9 @@ class BlockPoolSlice {
final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
final File targetBlockFile = new File(blockDir, blockFile.getName());
final File targetMetaFile = new File(blockDir, metaFile.getName());
FileUtils.moveFile(blockFile, targetBlockFile);
fileIoProvider.moveFile(volume, blockFile, targetBlockFile);
FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
FileUtils.moveFile(metaFile, targetMetaFile);
fileIoProvider.moveFile(volume, metaFile, targetMetaFile);
FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
ReplicaInfo newReplicaInfo =
@ -394,16 +386,13 @@ class BlockPoolSlice {
File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
if (blockFile.exists()) {
// If the original block file still exists, then no recovery is needed.
if (!unlinkedTmp.delete()) {
if (!fileIoProvider.delete(volume, unlinkedTmp)) {
throw new IOException("Unable to cleanup unlinked tmp file " +
unlinkedTmp);
}
return null;
} else {
if (!unlinkedTmp.renameTo(blockFile)) {
throw new IOException("Unable to rename unlinked tmp file " +
unlinkedTmp);
}
fileIoProvider.rename(volume, unlinkedTmp, blockFile);
return blockFile;
}
}
@ -416,7 +405,7 @@ class BlockPoolSlice {
*/
private int moveLazyPersistReplicasToFinalized(File source)
throws IOException {
File files[] = FileUtil.listFiles(source);
File[] files = fileIoProvider.listFiles(volume, source);
int numRecovered = 0;
for (File file : files) {
if (file.isDirectory()) {
@ -431,24 +420,25 @@ class BlockPoolSlice {
if (blockFile.exists()) {
if (!targetDir.exists() && !targetDir.mkdirs()) {
try {
fileIoProvider.mkdirsWithExistsCheck(volume, targetDir);
} catch(IOException ioe) {
LOG.warn("Failed to mkdirs " + targetDir);
continue;
}
final File targetMetaFile = new File(targetDir, metaFile.getName());
try {
LocalReplica.rename(metaFile, targetMetaFile);
fileIoProvider.rename(volume, metaFile, targetMetaFile);
} catch (IOException e) {
LOG.warn("Failed to move meta file from "
+ metaFile + " to " + targetMetaFile, e);
continue;
}
final File targetBlockFile = new File(targetDir, blockFile.getName());
try {
LocalReplica.rename(blockFile, targetBlockFile);
fileIoProvider.rename(volume, blockFile, targetBlockFile);
} catch (IOException e) {
LOG.warn("Failed to move block file from "
+ blockFile + " to " + targetBlockFile, e);
@ -465,7 +455,7 @@ class BlockPoolSlice {
}
}
FileUtil.fullyDelete(source);
fileIoProvider.fullyDelete(volume, source);
return numRecovered;
}
@ -508,7 +498,7 @@ class BlockPoolSlice {
loadRwr = false;
}
sc.close();
if (!restartMeta.delete()) {
if (!fileIoProvider.delete(volume, restartMeta)) {
FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
@ -568,7 +558,7 @@ class BlockPoolSlice {
final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException {
File files[] = FileUtil.listFiles(dir);
File[] files = fileIoProvider.listFiles(volume, dir);
for (File file : files) {
if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
@ -581,8 +571,9 @@ class BlockPoolSlice {
continue;
}
}
if (!Block.isBlockFilename(file))
if (!Block.isBlockFilename(file)) {
continue;
}
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file);
@ -700,7 +691,8 @@ class BlockPoolSlice {
return 0;
}
try (DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(new FileInputStream(metaFile),
new BufferedInputStream(
fileIoProvider.getFileInputStream(volume, metaFile),
ioFileBufferSize))) {
// read and handle the common header here. For now just a version
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
@ -713,9 +705,10 @@ class BlockPoolSlice {
if (numChunks == 0) {
return 0;
}
try (InputStream blockIn = new FileInputStream(blockFile);
try (InputStream blockIn = fileIoProvider.getFileInputStream(
volume, blockFile);
ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
checksumIn, volume.obtainReference())) {
checksumIn, volume.obtainReference(), fileIoProvider)) {
ris.skipChecksumFully((numChunks - 1) * checksumSize);
long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
ris.skipDataFully(lastChunkStartPos);
@ -734,7 +727,8 @@ class BlockPoolSlice {
// truncate if extra bytes are present without CRC
if (blockFile.length() > validFileLength) {
try (RandomAccessFile blockRAF =
new RandomAccessFile(blockFile, "rw")) {
fileIoProvider.getRandomAccessFile(
volume, blockFile, "rw")) {
// truncate blockFile
blockRAF.setLength(validFileLength);
}
@ -786,12 +780,14 @@ class BlockPoolSlice {
}
FileInputStream inputStream = null;
try {
inputStream = new FileInputStream(replicaFile);
inputStream = fileIoProvider.getFileInputStream(volume, replicaFile);
BlockListAsLongs blocksList =
BlockListAsLongs.readFrom(inputStream, maxDataLength);
Iterator<BlockReportReplica> iterator = blocksList.iterator();
while (iterator.hasNext()) {
BlockReportReplica replica = iterator.next();
if (blocksList == null) {
return false;
}
for (BlockReportReplica replica : blocksList) {
switch (replica.getState()) {
case FINALIZED:
addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
@ -828,7 +824,7 @@ class BlockPoolSlice {
return false;
}
finally {
if (!replicaFile.delete()) {
if (!fileIoProvider.delete(volume, replicaFile)) {
LOG.info("Failed to delete replica cache file: " +
replicaFile.getPath());
}
@ -842,41 +838,29 @@ class BlockPoolSlice {
blocksListToPersist.getNumberOfBlocks()== 0) {
return;
}
File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
if (tmpFile.exists() && !tmpFile.delete()) {
LOG.warn("Failed to delete tmp replicas file in " +
tmpFile.getPath());
return;
}
File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
LOG.warn("Failed to delete replicas file in " +
replicaCacheFile.getPath());
final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) ||
!fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) {
return;
}
FileOutputStream out = null;
try {
out = new FileOutputStream(tmpFile);
out = fileIoProvider.getFileOutputStream(volume, tmpFile);
blocksListToPersist.writeTo(out);
out.close();
// Renaming the tmp file to replicas
Files.move(tmpFile, replicaCacheFile);
fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile);
} catch (Exception e) {
// If write failed, the volume might be bad. Since the cache file is
// not critical, log the error, delete both the files (tmp and cache)
// and continue.
LOG.warn("Failed to write replicas to cache ", e);
if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
LOG.warn("Failed to delete replicas file: " +
replicaCacheFile.getPath());
}
fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile);
} finally {
IOUtils.closeStream(out);
if (tmpFile.exists() && !tmpFile.delete()) {
LOG.warn("Failed to delete tmp file in " +
tmpFile.getPath());
}
fileIoProvider.deleteWithExistsCheck(volume, tmpFile);
}
}

View File

@ -272,8 +272,10 @@ class FsDatasetAsyncDiskService {
}
File trashDirFile = new File(trashDirectory);
if (!trashDirFile.exists() && !trashDirFile.mkdirs()) {
LOG.error("Failed to create trash directory " + trashDirectory);
try {
volume.getFileIoProvider().mkdirsWithExistsCheck(
volume, trashDirFile);
} catch (IOException e) {
return false;
}

View File

@ -21,6 +21,7 @@ import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
@ -57,6 +58,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -418,6 +420,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setDataset(this)
.setStorageID(sd.getStorageUuid())
.setStorageDirectory(sd)
.setFileIoProvider(datanode.getFileIoProvider())
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
@ -437,6 +440,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setDataset(this)
.setStorageID(storageUuid)
.setStorageDirectory(sd)
.setFileIoProvider(datanode.getFileIoProvider())
.setConf(conf)
.build();
}
@ -818,7 +822,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
InputStream blockInStream = info.getDataInputStream(blkOffset);
try {
InputStream metaInStream = info.getMetadataInputStream(metaOffset);
return new ReplicaInputStreams(blockInStream, metaInStream, ref);
return new ReplicaInputStreams(
blockInStream, metaInStream, ref, datanode.getFileIoProvider());
} catch (IOException e) {
IOUtils.cleanup(null, blockInStream);
throw e;
@ -1027,9 +1032,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
int smallBufferSize, final Configuration conf)
throws IOException {
File srcMeta = new File(srcReplica.getMetadataURI());
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
DFSUtilClient.getIoFileBufferSize(conf));
final File srcMeta = new File(srcReplica.getMetadataURI());
DataChecksum checksum;
try (FileInputStream fis =
srcReplica.getFileIoProvider().getFileInputStream(
srcReplica.getVolume(), srcMeta)) {
checksum = BlockMetadataHeader.readDataChecksum(
fis, DFSUtilClient.getIoFileBufferSize(conf), srcMeta);
}
final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
@ -2161,16 +2173,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return;
}
final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
Block.getGenerationStamp(diskMetaFile.getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
final boolean diskMetaFileExists = diskMetaFile != null &&
fileIoProvider.exists(vol, diskMetaFile);
final boolean diskFileExists = diskFile != null &&
fileIoProvider.exists(vol, diskFile);
if (diskFile == null || !diskFile.exists()) {
final long diskGS = diskMetaFileExists ?
Block.getGenerationStamp(diskMetaFile.getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
if (!diskFileExists) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
// If metadata file exists then delete it
if (diskMetaFile != null && diskMetaFile.exists()
&& diskMetaFile.delete()) {
if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file without a block "
+ diskMetaFile.getAbsolutePath());
}
@ -2186,8 +2203,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
if (diskMetaFile != null && diskMetaFile.exists()
&& diskMetaFile.delete()) {
if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file for the deleted block "
+ diskMetaFile.getAbsolutePath());
}
@ -2223,7 +2239,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Compare block files
if (memBlockInfo.blockDataExists()) {
if (memBlockInfo.getBlockURI().compareTo(diskFile.toURI()) != 0) {
if (diskMetaFile.exists()) {
if (diskMetaFileExists) {
if (memBlockInfo.metadataExists()) {
// We have two sets of block+meta files. Decide which one to
// keep.
@ -2239,7 +2255,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
memBlockInfo, diskBlockInfo, volumeMap);
}
} else {
if (!diskFile.delete()) {
if (!fileIoProvider.delete(vol, diskFile)) {
LOG.warn("Failed to delete " + diskFile);
}
}
@ -2278,8 +2294,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// as the block file, then use the generation stamp from it
try {
File memFile = new File(memBlockInfo.getBlockURI());
long gs = diskMetaFile != null && diskMetaFile.exists()
&& diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
long gs = diskMetaFileExists &&
diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
: HdfsConstants.GRANDFATHER_GENERATION_STAMP;
LOG.warn("Updating generation stamp for block " + blockId

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@ -80,7 +81,7 @@ public class FsDatasetUtil {
return matches[0];
}
public static FileInputStream openAndSeek(File file, long offset)
public static FileDescriptor openAndSeek(File file, long offset)
throws IOException {
RandomAccessFile raf = null;
try {
@ -88,7 +89,7 @@ public class FsDatasetUtil {
if (offset > 0) {
raf.seek(offset);
}
return new FileInputStream(raf.getFD());
return raf.getFD();
} catch(IOException ioe) {
IOUtils.cleanup(null, raf);
throw ioe;

View File

@ -19,14 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
@ -46,8 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -75,7 +73,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils;
@ -132,6 +129,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
// limit the visible capacity for tests. If negative, then we just
// query from the filesystem.
protected volatile long configuredCapacity;
private final FileIoProvider fileIoProvider;
/**
* Per-volume worker pool that processes new blocks to cache.
@ -141,8 +139,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
protected ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd,
Configuration conf) throws IOException {
FsVolumeImpl(
FsDatasetImpl dataset, String storageID, StorageDirectory sd,
FileIoProvider fileIoProvider, Configuration conf) throws IOException {
if (sd.getStorageLocation() == null) {
throw new IOException("StorageLocation specified for storage directory " +
@ -162,6 +161,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
this.configuredCapacity = -1;
this.conf = conf;
this.fileIoProvider = fileIoProvider;
cacheExecutor = initializeCacheExecutor(parent);
}
@ -664,8 +664,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
private String getNextSubDir(String prev, File dir)
throws IOException {
List<String> children =
IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
List<String> children = fileIoProvider.listDirectory(
FsVolumeImpl.this, dir, SubdirFilter.INSTANCE);
cache = null;
cacheMs = 0;
if (children.size() == 0) {
@ -718,8 +718,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
state.curFinalizedDir, state.curFinalizedSubDir).toFile();
List<String> entries =
IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
List<String> entries = fileIoProvider.listDirectory(
FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE);
if (entries.size() == 0) {
entries = null;
} else {
@ -839,19 +839,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
public void save() throws IOException {
state.lastSavedMs = Time.now();
boolean success = false;
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(fileIoProvider.getFileOutputStream(
FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) {
WRITER.writeValue(writer, state);
success = true;
} finally {
if (!success) {
if (getTempSaveFile().delete()) {
LOG.debug("save({}, {}): error deleting temporary file.",
storageID, bpid);
}
fileIoProvider.delete(FsVolumeImpl.this, getTempSaveFile());
}
}
Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
fileIoProvider.move(FsVolumeImpl.this,
getTempSaveFile().toPath(), getSaveFile().toPath(),
StandardCopyOption.ATOMIC_MOVE);
if (LOG.isTraceEnabled()) {
LOG.trace("save({}, {}): saved {}", storageID, bpid,
@ -1042,11 +1041,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
finalizedDir)) {
if (fileIoProvider.exists(this, finalizedDir) &&
!DatanodeUtil.dirNoFilesRecursive(this, finalizedDir, fileIoProvider)) {
return false;
}
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
if (fileIoProvider.exists(this, rbwDir) &&
fileIoProvider.list(this, rbwDir).length != 0) {
return false;
}
return true;
@ -1067,35 +1067,38 @@ public class FsVolumeImpl implements FsVolumeSpi {
DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
DataStorage.fullyDelete(bpDir);
fileIoProvider.fullyDelete(this, bpDir);
} else {
if (!rbwDir.delete()) {
if (!fileIoProvider.delete(this, rbwDir)) {
throw new IOException("Failed to delete " + rbwDir);
}
if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
!FileUtil.fullyDelete(finalizedDir)) {
if (!DatanodeUtil.dirNoFilesRecursive(
this, finalizedDir, fileIoProvider) ||
!fileIoProvider.fullyDelete(
this, finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
}
if (lazypersistDir.exists() &&
((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
!FileUtil.fullyDelete(lazypersistDir)))) {
((!DatanodeUtil.dirNoFilesRecursive(
this, lazypersistDir, fileIoProvider) ||
!fileIoProvider.fullyDelete(this, lazypersistDir)))) {
throw new IOException("Failed to delete " + lazypersistDir);
}
DataStorage.fullyDelete(tmpDir);
for (File f : FileUtil.listFiles(bpCurrentDir)) {
if (!f.delete()) {
fileIoProvider.fullyDelete(this, tmpDir);
for (File f : fileIoProvider.listFiles(this, bpCurrentDir)) {
if (!fileIoProvider.delete(this, f)) {
throw new IOException("Failed to delete " + f);
}
}
if (!bpCurrentDir.delete()) {
if (!fileIoProvider.delete(this, bpCurrentDir)) {
throw new IOException("Failed to delete " + bpCurrentDir);
}
for (File f : FileUtil.listFiles(bpDir)) {
if (!f.delete()) {
for (File f : fileIoProvider.listFiles(this, bpDir)) {
if (!fileIoProvider.delete(this, f)) {
throw new IOException("Failed to delete " + f);
}
}
if (!bpDir.delete()) {
if (!fileIoProvider.delete(this, bpDir)) {
throw new IOException("Failed to delete " + bpDir);
}
}
@ -1118,7 +1121,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
private byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
// readHeader closes the temporary FileInputStream.
DataChecksum dcs = BlockMetadataHeader
.readHeader(fileIoProvider.getFileInputStream(this, metaFile))
.getChecksum();
final int checksumSize = dcs.getChecksumSize();
final long onDiskLen = blockFile.length();
final int bytesPerChecksum = dcs.getBytesPerChecksum();
@ -1132,7 +1138,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
(int)(onDiskLen / bytesPerChecksum * checksumSize);
byte[] lastChecksum = new byte[checksumSize];
try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile(
this, metaFile, "r")) {
raf.seek(offsetInChecksum);
raf.read(lastChecksum, 0, checksumSize);
}
@ -1246,8 +1253,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
File blockFile = copiedReplicaFiles[1];
File metaFile = copiedReplicaFiles[0];
LocalReplica.truncateBlock(blockFile, metaFile,
rur.getNumBytes(), newlength);
LocalReplica.truncateBlock(rur.getVolume(), blockFile, metaFile,
rur.getNumBytes(), newlength, fileIoProvider);
LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
.setBlockId(newBlockId)
@ -1283,6 +1290,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
getFinalizedDir(bpid), report, reportCompiler);
}
@Override
public FileIoProvider getFileIoProvider() {
return fileIoProvider;
}
private LinkedList<ScanInfo> compileReport(File bpFinalizedDir,
File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException {
@ -1291,7 +1303,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
List <String> fileNames;
try {
fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
fileNames = fileIoProvider.listDirectory(
this, dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure.

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
/**
* This class is to be used as a builder for {@link FsVolumeImpl} objects.
@ -31,6 +32,7 @@ public class FsVolumeImplBuilder {
private String storageID;
private StorageDirectory sd;
private Configuration conf;
private FileIoProvider fileIoProvider;
public FsVolumeImplBuilder() {
dataset = null;
@ -59,7 +61,15 @@ public class FsVolumeImplBuilder {
return this;
}
FsVolumeImplBuilder setFileIoProvider(FileIoProvider fileIoProvider) {
this.fileIoProvider = fileIoProvider;
return this;
}
FsVolumeImpl build() throws IOException {
return new FsVolumeImpl(dataset, storageID, sd, conf);
return new FsVolumeImpl(
dataset, storageID, sd,
fileIoProvider != null ? fileIoProvider : new FileIoProvider(null),
conf);
}
}

View File

@ -701,7 +701,7 @@ public class TestFileAppend{
ReplicaBeingWritten rbw =
(ReplicaBeingWritten)replicaHandler.getReplica();
ReplicaOutputStreams
outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1];

View File

@ -122,6 +122,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static final byte[] nullCrcFileData;
private final AutoCloseableLock datasetLock;
private final FileIoProvider fileIoProvider;
static {
DataChecksum checksum = DataChecksum.newDataChecksum(
@ -260,7 +261,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
DataChecksum requestedChecksum)
throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
@ -268,7 +269,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
volume.isTransientStorage(), slowLogThresholdMs);
volume, fileIoProvider);
}
}
@ -474,9 +475,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static class SimulatedVolume implements FsVolumeSpi {
private final SimulatedStorage storage;
private final FileIoProvider fileIoProvider;
SimulatedVolume(final SimulatedStorage storage) {
SimulatedVolume(final SimulatedStorage storage,
final FileIoProvider fileIoProvider) {
this.storage = storage;
this.fileIoProvider = fileIoProvider;
}
@Override
@ -559,6 +563,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return null;
}
@Override
public FileIoProvider getFileIoProvider() {
return fileIoProvider;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
@ -590,10 +599,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
registerMBean(datanodeUuid);
this.fileIoProvider = new FileIoProvider(conf);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
this.volume = new SimulatedVolume(this.storage);
this.volume = new SimulatedVolume(this.storage, this.fileIoProvider);
this.datasetLock = new AutoCloseableLock();
}

View File

@ -673,7 +673,7 @@ public class TestBlockRecovery {
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =

View File

@ -905,6 +905,11 @@ public class TestDirectoryScanner {
return null;
}
@Override
public FileIoProvider getFileIoProvider() {
return null;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)

View File

@ -83,7 +83,7 @@ public class TestSimulatedFSDataset {
ReplicaInPipeline bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
OutputStream dataOut = out.getDataOut();
assertEquals(0, fsdataset.getLength(b));

View File

@ -134,7 +134,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
@Override
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
long ckoff) throws IOException {
return new ReplicaInputStreams(null, null, null);
return new ReplicaInputStreams(null, null, null, null);
}
@Override

View File

@ -58,10 +58,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline {
@Override
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
DataChecksum requestedChecksum)
throws IOException {
return new ReplicaOutputStreams(null, null, requestedChecksum, false,
slowLogThresholdMs);
return new ReplicaOutputStreams(null, null, requestedChecksum,
null, null);
}
@Override

View File

@ -26,6 +26,7 @@ import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
@ -114,6 +115,11 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
return null;
}
@Override
public FileIoProvider getFileIoProvider() {
return null;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {

View File

@ -99,6 +99,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
.add(DFSConfigKeys.DFS_DATANODE_STARTUP_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY);
// Allocate
xmlPropsToSkipCompare = new HashSet<String>();