From 2407c9b93aabb021b76c802b19c928fb6cbb0a85 Mon Sep 17 00:00:00 2001 From: Virajith Jalaparti Date: Mon, 7 Aug 2017 14:31:15 -0700 Subject: [PATCH] HDFS-12093. [READ] Share remoteFS between ProvidedReplica instances. --- .../datanode/FinalizedProvidedReplica.java | 6 +++-- .../hdfs/server/datanode/ProvidedReplica.java | 25 +++++++++++-------- .../hdfs/server/datanode/ReplicaBuilder.java | 11 ++++++-- .../fsdataset/impl/ProvidedVolumeImpl.java | 17 +++++++++---- .../datanode/TestProvidedReplicaImpl.java | 2 +- 5 files changed, 40 insertions(+), 21 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java index 722d573f3cc..e23d6be6622 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.net.URI; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -31,8 +32,9 @@ public class FinalizedProvidedReplica extends ProvidedReplica { public FinalizedProvidedReplica(long blockId, URI fileURI, long fileOffset, long blockLen, long genStamp, - FsVolumeSpi volume, Configuration conf) { - super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf); + FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) { + super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf, + remoteFS); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java index 946ab5aa8b8..2b3bd133f53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java @@ -65,16 +65,23 @@ public abstract class ProvidedReplica extends ReplicaInfo { * @param volume the volume this block belongs to */ public ProvidedReplica(long blockId, URI fileURI, long fileOffset, - long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf) { + long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf, + FileSystem remoteFS) { super(volume, blockId, blockLen, genStamp); this.fileURI = fileURI; this.fileOffset = fileOffset; this.conf = conf; - try { - this.remoteFS = FileSystem.get(fileURI, this.conf); - } catch (IOException e) { - LOG.warn("Failed to obtain filesystem for " + fileURI); - this.remoteFS = null; + if (remoteFS != null) { + this.remoteFS = remoteFS; + } else { + LOG.warn( + "Creating an reference to the remote FS for provided block " + this); + try { + this.remoteFS = FileSystem.get(fileURI, this.conf); + } catch (IOException e) { + LOG.warn("Failed to obtain filesystem for " + fileURI); + this.remoteFS = null; + } } } @@ -83,11 +90,7 @@ public abstract class ProvidedReplica extends ReplicaInfo { this.fileURI = r.fileURI; this.fileOffset = r.fileOffset; this.conf = r.conf; - try { - this.remoteFS = FileSystem.newInstance(fileURI, this.conf); - } catch (IOException e) { - this.remoteFS = null; - } + this.remoteFS = r.remoteFS; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java index 639467f1bd2..c5cb6a56a65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; import java.net.URI; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -50,6 +51,7 @@ public class ReplicaBuilder { private long offset; private Configuration conf; private FileRegion fileRegion; + private FileSystem remoteFS; public ReplicaBuilder(ReplicaState state) { volume = null; @@ -138,6 +140,11 @@ public class ReplicaBuilder { return this; } + public ReplicaBuilder setRemoteFS(FileSystem remoteFS) { + this.remoteFS = remoteFS; + return this; + } + public LocalReplicaInPipeline buildLocalReplicaInPipeline() throws IllegalArgumentException { LocalReplicaInPipeline info = null; @@ -275,14 +282,14 @@ public class ReplicaBuilder { } if (fileRegion == null) { info = new FinalizedProvidedReplica(blockId, uri, offset, - length, genStamp, volume, conf); + length, genStamp, volume, conf, remoteFS); } else { info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(), fileRegion.getPath().toUri(), fileRegion.getOffset(), fileRegion.getBlock().getNumBytes(), fileRegion.getBlock().getGenerationStamp(), - volume, conf); + volume, conf, remoteFS); } return info; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java index 5cd28c767a5..d1a7015912a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; @@ -96,7 +97,8 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { } public void getVolumeMap(ReplicaMap volumeMap, - RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { + RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS) + throws IOException { Iterator iter = provider.iterator(); while (iter.hasNext()) { FileRegion region = iter.next(); @@ -112,9 +114,10 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { .setGenerationStamp(region.getBlock().getGenerationStamp()) .setFsVolume(providedVolume) .setConf(conf) + .setRemoteFS(remoteFS) .build(); - // check if the replica already exists - ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId()); + ReplicaInfo oldReplica = + volumeMap.get(bpid, newReplica.getBlockId()); if (oldReplica == null) { volumeMap.add(bpid, newReplica); bpVolumeMap.add(bpid, newReplica); @@ -163,6 +166,8 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { new ConcurrentHashMap(); private ProvidedVolumeDF df; + //the remote FileSystem to which this ProvidedVolume points to. + private FileSystem remoteFS; ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd, FileIoProvider fileIoProvider, @@ -176,6 +181,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS, DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class); df = ReflectionUtils.newInstance(dfClass, conf); + remoteFS = FileSystem.get(baseURI, conf); } @Override @@ -397,7 +403,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { throws IOException { LOG.info("Creating volumemap for provided volume " + this); for(ProvidedBlockPoolSlice s : bpSlices.values()) { - s.getVolumeMap(volumeMap, ramDiskReplicaMap); + s.getVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS); } } @@ -414,7 +420,8 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { void getVolumeMap(String bpid, ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { - getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap); + getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap, + remoteFS); } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java index 8258c21cffd..967e94d61d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java @@ -87,7 +87,7 @@ public class TestProvidedReplicaImpl { FILE_LEN >= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN; replicas.add( new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN, - currentReplicaLength, 0, null, conf)); + currentReplicaLength, 0, null, conf, null)); } }