HDFS-6109 let sync_file_range() system call run in background (Liang Xie via stack)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1599351 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a84fd44ee4
commit
78d46e8508
|
@ -144,6 +144,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6056. Clean up NFS config settings (brandonli)
|
HDFS-6056. Clean up NFS config settings (brandonli)
|
||||||
|
|
||||||
|
HDFS-6109 let sync_file_range() system call run in background
|
||||||
|
(Liang Xie via stack)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||||
|
|
|
@ -111,6 +111,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
|
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
|
||||||
public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
|
public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
|
||||||
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
|
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
|
||||||
|
public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY = "dfs.datanode.sync.behind.writes.in.background";
|
||||||
|
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT = false;
|
||||||
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
|
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
|
||||||
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
|
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
|
||||||
public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
|
public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
|
||||||
|
|
|
@ -104,6 +104,7 @@ class BlockReceiver implements Closeable {
|
||||||
private boolean dropCacheBehindWrites;
|
private boolean dropCacheBehindWrites;
|
||||||
private long lastCacheManagementOffset = 0;
|
private long lastCacheManagementOffset = 0;
|
||||||
private boolean syncBehindWrites;
|
private boolean syncBehindWrites;
|
||||||
|
private boolean syncBehindWritesInBackground;
|
||||||
|
|
||||||
/** The client name. It is empty if a datanode is the client */
|
/** The client name. It is empty if a datanode is the client */
|
||||||
private final String clientname;
|
private final String clientname;
|
||||||
|
@ -207,6 +208,8 @@ class BlockReceiver implements Closeable {
|
||||||
datanode.getDnConf().dropCacheBehindWrites :
|
datanode.getDnConf().dropCacheBehindWrites :
|
||||||
cachingStrategy.getDropBehind();
|
cachingStrategy.getDropBehind();
|
||||||
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
|
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
|
||||||
|
this.syncBehindWritesInBackground = datanode.getDnConf().
|
||||||
|
syncBehindWritesInBackground;
|
||||||
|
|
||||||
final boolean isCreate = isDatanode || isTransfer
|
final boolean isCreate = isDatanode || isTransfer
|
||||||
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||||
|
@ -668,10 +671,17 @@ class BlockReceiver implements Closeable {
|
||||||
// of file
|
// of file
|
||||||
//
|
//
|
||||||
if (syncBehindWrites) {
|
if (syncBehindWrites) {
|
||||||
NativeIO.POSIX.syncFileRangeIfPossible(outFd,
|
if (syncBehindWritesInBackground) {
|
||||||
lastCacheManagementOffset,
|
this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
|
||||||
|
block, outFd, lastCacheManagementOffset,
|
||||||
offsetInBlock - lastCacheManagementOffset,
|
offsetInBlock - lastCacheManagementOffset,
|
||||||
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
|
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
|
||||||
|
} else {
|
||||||
|
NativeIO.POSIX.syncFileRangeIfPossible(outFd,
|
||||||
|
lastCacheManagementOffset, offsetInBlock
|
||||||
|
- lastCacheManagementOffset,
|
||||||
|
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
// For POSIX_FADV_DONTNEED, we want to drop from the beginning
|
// For POSIX_FADV_DONTNEED, we want to drop from the beginning
|
||||||
|
|
|
@ -67,6 +67,7 @@ public class DNConf {
|
||||||
final boolean transferToAllowed;
|
final boolean transferToAllowed;
|
||||||
final boolean dropCacheBehindWrites;
|
final boolean dropCacheBehindWrites;
|
||||||
final boolean syncBehindWrites;
|
final boolean syncBehindWrites;
|
||||||
|
final boolean syncBehindWritesInBackground;
|
||||||
final boolean dropCacheBehindReads;
|
final boolean dropCacheBehindReads;
|
||||||
final boolean syncOnClose;
|
final boolean syncOnClose;
|
||||||
final boolean encryptDataTransfer;
|
final boolean encryptDataTransfer;
|
||||||
|
@ -119,6 +120,9 @@ public class DNConf {
|
||||||
syncBehindWrites = conf.getBoolean(
|
syncBehindWrites = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
|
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
|
||||||
|
syncBehindWritesInBackground = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT);
|
||||||
dropCacheBehindReads = conf.getBoolean(
|
dropCacheBehindReads = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||||
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -431,5 +432,12 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
||||||
* @return true when trash is enabled
|
* @return true when trash is enabled
|
||||||
*/
|
*/
|
||||||
public boolean trashEnabled(String bpid);
|
public boolean trashEnabled(String bpid);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* submit a sync_file_range request to AsyncDiskService
|
||||||
|
*/
|
||||||
|
public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
|
||||||
|
final FileDescriptor fd, final long offset, final long nbytes,
|
||||||
|
final int flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
@ -31,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is a container of multiple thread pools, each for a volume,
|
* This class is a container of multiple thread pools, each for a volume,
|
||||||
|
@ -42,6 +45,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
* can be slow, and we don't want to use a single thread pool because that
|
* can be slow, and we don't want to use a single thread pool because that
|
||||||
* is inefficient when we have more than 1 volume. AsyncDiskService is the
|
* is inefficient when we have more than 1 volume. AsyncDiskService is the
|
||||||
* solution for these.
|
* solution for these.
|
||||||
|
* Another example of async disk operation is requesting sync_file_range().
|
||||||
*
|
*
|
||||||
* This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
|
* This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
|
||||||
* They should be combined.
|
* They should be combined.
|
||||||
|
@ -148,6 +152,21 @@ class FsDatasetAsyncDiskService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void submitSyncFileRangeRequest(FsVolumeImpl volume,
|
||||||
|
final FileDescriptor fd, final long offset, final long nbytes,
|
||||||
|
final int flags) {
|
||||||
|
execute(volume.getCurrentDir(), new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags);
|
||||||
|
} catch (NativeIOException e) {
|
||||||
|
LOG.warn("sync_file_range error", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete the block file and meta file from the disk asynchronously, adjust
|
* Delete the block file and meta file from the disk asynchronously, adjust
|
||||||
* dfsUsed statistics accordingly.
|
* dfsUsed statistics accordingly.
|
||||||
|
|
|
@ -1907,5 +1907,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
return new RollingLogsImpl(dir, prefix);
|
return new RollingLogsImpl(dir, prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
|
||||||
|
FileDescriptor fd, long offset, long nbytes, int flags) {
|
||||||
|
FsVolumeImpl fsVolumeImpl = this.getVolume(block);
|
||||||
|
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
|
||||||
|
nbytes, flags);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -1112,5 +1113,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
public FsVolumeSpi getVolume(ExtendedBlock b) {
|
public FsVolumeSpi getVolume(ExtendedBlock b) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
|
||||||
|
FileDescriptor fd, long offset, long nbytes, int flags) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue