HDFS-12777. [READ] Reduce memory and CPU footprint for PROVIDED volumes.

This commit is contained in:
Virajith Jalaparti 2017-11-10 10:19:33 -08:00 committed by Chris Douglas
parent 6cd80b2521
commit e1a28f95b8
6 changed files with 196 additions and 36 deletions

View File

@ -530,6 +530,10 @@ public class DirectoryScanner implements Runnable {
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) {
if (volumes.get(i).getStorageType() == StorageType.PROVIDED) {
// Disable scanning PROVIDED volumes to keep overhead low
continue;
}
ReportCompiler reportCompiler =
new ReportCompiler(datanode, volumes.get(i));
Future<ScanInfoPerBlockPool> result =

View File

@ -21,6 +21,7 @@ import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -37,6 +38,13 @@ public class FinalizedProvidedReplica extends ProvidedReplica {
remoteFS);
}
public FinalizedProvidedReplica(long blockId, Path pathPrefix,
String pathSuffix, long fileOffset, long blockLen, long genStamp,
FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) {
super(blockId, pathPrefix, pathSuffix, fileOffset, blockLen,
genStamp, volume, conf, remoteFS);
}
@Override
public ReplicaState getState() {
return ReplicaState.FINALIZED;

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@ -51,18 +52,23 @@ public abstract class ProvidedReplica extends ReplicaInfo {
static final byte[] NULL_CHECKSUM_ARRAY =
FsDatasetUtil.createNullChecksumByteArray();
private URI fileURI;
private Path pathPrefix;
private String pathSuffix;
private long fileOffset;
private Configuration conf;
private FileSystem remoteFS;
/**
* Constructor.
*
* @param blockId block id
* @param fileURI remote URI this block is to be read from
* @param fileOffset the offset in the remote URI
* @param blockLen the length of the block
* @param genStamp the generation stamp of the block
* @param volume the volume this block belongs to
* @param conf the configuration
* @param remoteFS reference to the remote filesystem to use for this replica.
*/
public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf,
@ -85,23 +91,86 @@ public abstract class ProvidedReplica extends ReplicaInfo {
}
}
/**
* Constructor.
*
* @param blockId block id
* @param pathPrefix A prefix of the {@link Path} associated with this replica
* on the remote {@link FileSystem}.
* @param pathSuffix A suffix of the {@link Path} associated with this replica
* on the remote {@link FileSystem}. Resolving the {@code pathSuffix}
* against the {@code pathPrefix} should provide the exact
* {@link Path} of the data associated with this replica on the
* remote {@link FileSystem}.
* @param fileOffset the offset in the remote URI
* @param blockLen the length of the block
* @param genStamp the generation stamp of the block
* @param volume the volume this block belongs to
* @param conf the configuration
* @param remoteFS reference to the remote filesystem to use for this replica.
*/
public ProvidedReplica(long blockId, Path pathPrefix, String pathSuffix,
long fileOffset, long blockLen, long genStamp, FsVolumeSpi volume,
Configuration conf, FileSystem remoteFS) {
super(volume, blockId, blockLen, genStamp);
this.fileURI = null;
this.pathPrefix = pathPrefix;
this.pathSuffix = pathSuffix;
this.fileOffset = fileOffset;
this.conf = conf;
if (remoteFS != null) {
this.remoteFS = remoteFS;
} else {
LOG.warn(
"Creating an reference to the remote FS for provided block " + this);
try {
this.remoteFS = FileSystem.get(pathPrefix.toUri(), this.conf);
} catch (IOException e) {
LOG.warn("Failed to obtain filesystem for " + pathPrefix);
this.remoteFS = null;
}
}
}
public ProvidedReplica(ProvidedReplica r) {
super(r);
this.fileURI = r.fileURI;
this.fileOffset = r.fileOffset;
this.conf = r.conf;
this.remoteFS = r.remoteFS;
this.pathPrefix = r.pathPrefix;
this.pathSuffix = r.pathSuffix;
}
@Override
public URI getBlockURI() {
return this.fileURI;
return getRemoteURI();
}
@VisibleForTesting
public String getPathSuffix() {
return pathSuffix;
}
@VisibleForTesting
public Path getPathPrefix() {
return pathPrefix;
}
private URI getRemoteURI() {
if (fileURI != null) {
return fileURI;
} else if (pathPrefix == null) {
return new Path(pathSuffix).toUri();
} else {
return new Path(pathPrefix, pathSuffix).toUri();
}
}
@Override
public InputStream getDataInputStream(long seekOffset) throws IOException {
if (remoteFS != null) {
FSDataInputStream ins = remoteFS.open(new Path(fileURI));
FSDataInputStream ins = remoteFS.open(new Path(getRemoteURI()));
ins.seek(fileOffset + seekOffset);
return new BoundedInputStream(
new FSDataInputStream(ins), getBlockDataLength());
@ -132,7 +201,7 @@ public abstract class ProvidedReplica extends ReplicaInfo {
public boolean blockDataExists() {
if(remoteFS != null) {
try {
return remoteFS.exists(new Path(fileURI));
return remoteFS.exists(new Path(getRemoteURI()));
} catch (IOException e) {
return false;
}
@ -220,7 +289,7 @@ public abstract class ProvidedReplica extends ReplicaInfo {
public int compareWith(ScanInfo info) {
//local scanning cannot find any provided blocks.
if (info.getFileRegion().equals(
new FileRegion(this.getBlockId(), new Path(fileURI),
new FileRegion(this.getBlockId(), new Path(getRemoteURI()),
fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
return 0;
} else {

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.net.URI;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@ -52,6 +53,8 @@ public class ReplicaBuilder {
private Configuration conf;
private FileRegion fileRegion;
private FileSystem remoteFS;
private String pathSuffix;
private Path pathPrefix;
public ReplicaBuilder(ReplicaState state) {
volume = null;
@ -145,6 +148,28 @@ public class ReplicaBuilder {
return this;
}
/**
* Set the suffix of the {@link Path} associated with the replica.
* Intended to be use only for {@link ProvidedReplica}s.
* @param suffix the path suffix.
* @return the builder with the path suffix set.
*/
public ReplicaBuilder setPathSuffix(String suffix) {
this.pathSuffix = suffix;
return this;
}
/**
* Set the prefix of the {@link Path} associated with the replica.
* Intended to be use only for {@link ProvidedReplica}s.
* @param prefix the path prefix.
* @return the builder with the path prefix set.
*/
public ReplicaBuilder setPathPrefix(Path prefix) {
this.pathPrefix = prefix;
return this;
}
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
throws IllegalArgumentException {
LocalReplicaInPipeline info = null;
@ -275,14 +300,20 @@ public class ReplicaBuilder {
throw new IllegalArgumentException("Finalized PROVIDED replica " +
"cannot be constructed from another replica");
}
if (fileRegion == null && uri == null) {
if (fileRegion == null && uri == null &&
(pathPrefix == null || pathSuffix == null)) {
throw new IllegalArgumentException(
"Trying to construct a provided replica on " + volume +
" without enough information");
}
if (fileRegion == null) {
info = new FinalizedProvidedReplica(blockId, uri, offset,
length, genStamp, volume, conf, remoteFS);
if (uri != null) {
info = new FinalizedProvidedReplica(blockId, uri, offset,
length, genStamp, volume, conf, remoteFS);
} else {
info = new FinalizedProvidedReplica(blockId, pathPrefix, pathSuffix,
offset, length, genStamp, volume, conf, remoteFS);
}
} else {
info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
fileRegion.getPath().toUri(),

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
@ -65,6 +66,29 @@ import org.apache.hadoop.util.Time;
*/
public class ProvidedVolumeImpl extends FsVolumeImpl {
/**
* Get a suffix of the full path, excluding the given prefix.
*
* @param prefix a prefix of the path.
* @param fullPath the full path whose suffix is needed.
* @return the suffix of the path, which when resolved against {@code prefix}
* gets back the {@code fullPath}.
*/
@VisibleForTesting
protected static String getSuffix(final Path prefix, final Path fullPath) {
String prefixStr = prefix.toString();
String pathStr = fullPath.toString();
if (!pathStr.startsWith(prefixStr)) {
LOG.debug("Path {} is not a prefix of the path {}", prefix, fullPath);
return pathStr;
}
String suffix = pathStr.replaceFirst("^" + prefixStr, "");
if (suffix.startsWith("/")) {
suffix = suffix.substring(1);
}
return suffix;
}
static class ProvidedBlockPoolSlice {
private ProvidedVolumeImpl providedVolume;
@ -106,15 +130,19 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
return;
}
Iterator<FileRegion> iter = reader.iterator();
Path blockPrefixPath = new Path(providedVolume.getBaseURI());
while (iter.hasNext()) {
FileRegion region = iter.next();
if (region.getBlockPoolId() != null
&& region.getBlockPoolId().equals(bpid)
&& containsBlock(providedVolume.baseURI,
region.getPath().toUri())) {
String blockSuffix =
getSuffix(blockPrefixPath, new Path(region.getPath().toUri()));
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
.setBlockId(region.getBlock().getBlockId())
.setURI(region.getPath().toUri())
.setPathPrefix(blockPrefixPath)
.setPathSuffix(blockSuffix)
.setOffset(region.getOffset())
.setLength(region.getBlock().getNumBytes())
.setGenerationStamp(region.getBlock().getGenerationStamp())

View File

@ -62,7 +62,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@ -509,33 +509,6 @@ public class TestProvidedImpl {
}
}
@Test
public void testRefresh() throws IOException {
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
for (int i = 0; i < providedVolumes.size(); i++) {
ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
TestFileRegionBlockAliasMap testBlockFormat =
(TestFileRegionBlockAliasMap) vol
.getBlockFormat(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
//equivalent to two new blocks appearing
testBlockFormat.setBlockCount(NUM_PROVIDED_BLKS + 2);
//equivalent to deleting the first block
testBlockFormat.setMinBlkId(MIN_BLK_ID + 1);
DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
scanner.reconcile();
ReplicaInfo info = dataset.getBlockReplica(
BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1);
//new replica should be added to the dataset
assertTrue(info != null);
try {
info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0);
} catch(Exception ex) {
LOG.info("Exception expected: " + ex);
}
}
}
private int getBlocksInProvidedVolumes(String basePath, int numBlocks,
int minBlockId) throws IOException {
TestFileRegionIterator fileRegionIterator =
@ -621,4 +594,51 @@ public class TestProvidedImpl {
ProvidedVolumeImpl.containsBlock(new URI("/bucket1/dir1/"),
new URI("s3a:/bucket1/dir1/temp.txt")));
}
@Test
public void testProvidedReplicaSuffixExtraction() {
assertEquals("B.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/"), new Path("file:///A/B.txt")));
assertEquals("B/C.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/"), new Path("file:///A/B/C.txt")));
assertEquals("B/C/D.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/"), new Path("file:///A/B/C/D.txt")));
assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/B/C/"), new Path("file:///A/B/C/D.txt")));
assertEquals("file:/A/B/C/D.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///X/B/C/"), new Path("file:///A/B/C/D.txt")));
assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
new Path("/A/B/C"), new Path("/A/B/C/D.txt")));
assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
new Path("/A/B/C/"), new Path("/A/B/C/D.txt")));
assertEquals("data/current.csv", ProvidedVolumeImpl.getSuffix(
new Path("wasb:///users/alice/"),
new Path("wasb:///users/alice/data/current.csv")));
assertEquals("current.csv", ProvidedVolumeImpl.getSuffix(
new Path("wasb:///users/alice/data"),
new Path("wasb:///users/alice/data/current.csv")));
assertEquals("wasb:/users/alice/data/current.csv",
ProvidedVolumeImpl.getSuffix(
new Path("wasb:///users/bob/"),
new Path("wasb:///users/alice/data/current.csv")));
}
@Test
public void testProvidedReplicaPrefix() throws Exception {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
vol.getVolumeMap(volumeMap, null);
Path expectedPrefix = new Path(
StorageLocation.normalizeFileURI(new File(providedBasePath).toURI()));
for (ReplicaInfo info : volumeMap
.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID])) {
ProvidedReplica pInfo = (ProvidedReplica) info;
assertEquals(expectedPrefix, pInfo.getPathPrefix());
}
}
}
}