From fccace6116713c85cd59a808c565ea39fb5d6944 Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Fri, 17 Aug 2012 16:52:07 +0000 Subject: [PATCH] HDFS-3672. Expose disk-location information for blocks to enable better scheduling. Contributed by Andrew Wang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1374355 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/fs/BlockStorageLocation.java | 51 +++ .../apache/hadoop/fs/HdfsBlockLocation.java | 48 +++ .../org/apache/hadoop/fs/HdfsVolumeId.java | 74 ++++ .../java/org/apache/hadoop/fs/VolumeId.java | 49 +++ .../hadoop/hdfs/BlockStorageLocationUtil.java | 337 ++++++++++++++++++ .../org/apache/hadoop/hdfs/DFSClient.java | 98 ++++- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 16 +- .../hadoop/hdfs/DistributedFileSystem.java | 34 ++ .../hdfs/protocol/ClientDatanodeProtocol.java | 18 + .../hdfs/protocol/HdfsBlocksMetadata.java | 94 +++++ ...atanodeProtocolServerSideTranslatorPB.java | 46 +++ .../ClientDatanodeProtocolTranslatorPB.java | 50 ++- .../hadoop/hdfs/server/datanode/DataNode.java | 24 ++ .../datanode/fsdataset/FsDatasetSpi.java | 13 + .../fsdataset/impl/FsDatasetImpl.java | 38 ++ .../main/proto/ClientDatanodeProtocol.proto | 27 ++ .../src/main/resources/hdfs-default.xml | 26 +- .../hdfs/TestDistributedFileSystem.java | 94 +++++ .../server/datanode/SimulatedFSDataset.java | 7 + 21 files changed, 1146 insertions(+), 9 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 89bd85cd07a..701bc5801a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -402,6 +402,9 @@ Branch-2 ( Unreleased changes ) HDFS-2963. Console Output is confusing while executing metasave (dfsadmin command). (Andrew Wang via eli) + HDFS-3672. Expose disk-location information for blocks to enable better + scheduling. (Andrew Wang via atm) + OPTIMIZATIONS HDFS-2982. Startup performance suffers when there are many edit log diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java new file mode 100644 index 00000000000..abf3e388c82 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume + * location information for each replica. + */ +@InterfaceStability.Unstable +@InterfaceAudience.Public +public class BlockStorageLocation extends BlockLocation { + + private final VolumeId[] volumeIds; + + public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds) + throws IOException { + // Initialize with data from passed in BlockLocation + super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc + .getOffset(), loc.getLength(), loc.isCorrupt()); + this.volumeIds = volumeIds; + } + + /** + * Gets the list of {@link VolumeId} corresponding to the block's replicas. + * + * @return volumeIds list of VolumeId for the block's replicas + */ + public VolumeId[] getVolumeIds() { + return volumeIds; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java new file mode 100644 index 00000000000..f736d9637eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; + +/** + * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock}, + * allowing more detailed queries to the datanode about a block. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HdfsBlockLocation extends BlockLocation { + + private final LocatedBlock block; + + public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) + throws IOException { + // Initialize with data from passed in BlockLocation + super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), + loc.getOffset(), loc.getLength(), loc.isCorrupt()); + this.block = block; + } + + public LocatedBlock getLocatedBlock() { + return block; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java new file mode 100644 index 00000000000..e283e526271 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * HDFS-specific volume identifier which implements {@link VolumeId}. Can be + * used to differentiate between the data directories on a single datanode. This + * identifier is only unique on a per-datanode basis. + */ +@InterfaceStability.Unstable +@InterfaceAudience.Public +public class HdfsVolumeId implements VolumeId { + + private final byte id; + private final boolean isValid; + + public HdfsVolumeId(byte id, boolean isValid) { + this.id = id; + this.isValid = isValid; + } + + @Override + public boolean isValid() { + return isValid; + } + + @Override + public int compareTo(VolumeId arg0) { + return hashCode() - arg0.hashCode(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(id).toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + if (obj == this) { + return true; + } + + HdfsVolumeId that = (HdfsVolumeId) obj; + return new EqualsBuilder().append(this.id, that.id).isEquals(); + } + + @Override + public String toString() { + return Byte.toString(id); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java new file mode 100644 index 00000000000..f24ed66d009 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Opaque interface that identifies a disk location. Subclasses + * should implement {@link Comparable} and override both equals and hashCode. + */ +@InterfaceStability.Unstable +@InterfaceAudience.Public +public interface VolumeId extends Comparable { + + /** + * Indicates if the disk identifier is valid. Invalid identifiers indicate + * that the block was not present, or the location could otherwise not be + * determined. + * + * @return true if the disk identifier is valid + */ + public boolean isValid(); + + @Override + abstract public int compareTo(VolumeId arg0); + + @Override + abstract public int hashCode(); + + @Override + abstract public boolean equals(Object obj); + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java new file mode 100644 index 00000000000..e5888649881 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.BlockStorageLocation; +import org.apache.hadoop.fs.HdfsVolumeId; +import org.apache.hadoop.fs.VolumeId; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.token.Token; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class BlockStorageLocationUtil { + + private static final Log LOG = LogFactory + .getLog(BlockStorageLocationUtil.class); + + /** + * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set + * of datanodes and blocks. + * + * @param datanodeBlocks + * Map of datanodes to block replicas at each datanode + * @return callables Used to query each datanode for location information on + * the block replicas at the datanode + */ + private static List createVolumeBlockLocationCallables( + Configuration conf, Map> datanodeBlocks, + int timeout, boolean connectToDnViaHostname) { + // Construct the callables, one per datanode + List callables = + new ArrayList(); + for (Map.Entry> entry : datanodeBlocks + .entrySet()) { + // Construct RPC parameters + DatanodeInfo datanode = entry.getKey(); + List locatedBlocks = entry.getValue(); + List extendedBlocks = + new ArrayList(locatedBlocks.size()); + List> dnTokens = + new ArrayList>( + locatedBlocks.size()); + for (LocatedBlock b : locatedBlocks) { + extendedBlocks.add(b.getBlock()); + dnTokens.add(b.getBlockToken()); + } + VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( + conf, datanode, extendedBlocks, dnTokens, timeout, + connectToDnViaHostname); + callables.add(callable); + } + return callables; + } + + /** + * Queries datanodes for the blocks specified in datanodeBlocks, + * making one RPC to each datanode. These RPCs are made in parallel using a + * threadpool. + * + * @param datanodeBlocks + * Map of datanodes to the blocks present on the DN + * @return metadatas List of block metadata for each datanode, specifying + * volume locations for each block + * @throws InvalidBlockTokenException + * if client does not have read access on a requested block + */ + static List queryDatanodesForHdfsBlocksMetadata( + Configuration conf, Map> datanodeBlocks, + int poolsize, int timeout, boolean connectToDnViaHostname) + throws InvalidBlockTokenException { + + List callables = + createVolumeBlockLocationCallables(conf, datanodeBlocks, timeout, + connectToDnViaHostname); + + // Use a thread pool to execute the Callables in parallel + List> futures = + new ArrayList>(); + ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize); + try { + futures = executor.invokeAll(callables, timeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Swallow the exception here, because we can return partial results + } + executor.shutdown(); + + // Initialize metadatas list with nulls + // This is used to later indicate if we didn't get a response from a DN + List metadatas = new ArrayList(); + for (int i = 0; i < futures.size(); i++) { + metadatas.add(null); + } + // Fill in metadatas with results from DN RPCs, where possible + for (int i = 0; i < futures.size(); i++) { + Future future = futures.get(i); + try { + HdfsBlocksMetadata metadata = future.get(); + metadatas.set(i, metadata); + } catch (ExecutionException e) { + VolumeBlockLocationCallable callable = callables.get(i); + DatanodeInfo datanode = callable.getDatanodeInfo(); + Throwable t = e.getCause(); + if (t instanceof InvalidBlockTokenException) { + LOG.warn("Invalid access token when trying to retrieve " + + "information from datanode " + datanode.getIpcAddr(false)); + throw (InvalidBlockTokenException) t; + } + else if (t instanceof UnsupportedOperationException) { + LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support" + + " required #getHdfsBlocksMetadata() API"); + throw (UnsupportedOperationException) t; + } else { + LOG.info("Failed to connect to datanode " + + datanode.getIpcAddr(false)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Could not fetch information from datanode", t); + } + } catch (InterruptedException e) { + // Shouldn't happen, because invokeAll waits for all Futures to be ready + LOG.info("Interrupted while fetching HdfsBlocksMetadata"); + } + } + + return metadatas; + } + + /** + * Group the per-replica {@link VolumeId} info returned from + * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be associated + * with the corresponding {@link LocatedBlock}. + * + * @param blocks + * Original LocatedBlock array + * @param datanodeBlocks + * Mapping from datanodes to the list of replicas on each datanode + * @param metadatas + * VolumeId information for the replicas on each datanode + * @return blockVolumeIds per-replica VolumeId information associated with the + * parent LocatedBlock + */ + static Map> associateVolumeIdsWithBlocks( + List blocks, Map> datanodeBlocks, List metadatas) { + + // Initialize mapping of ExtendedBlock to LocatedBlock. + // Used to associate results from DN RPCs to the parent LocatedBlock + Map extBlockToLocBlock = + new HashMap(); + for (LocatedBlock b : blocks) { + extBlockToLocBlock.put(b.getBlock(), b); + } + + // Initialize the mapping of blocks -> list of VolumeIds, one per replica + // This is filled out with real values from the DN RPCs + Map> blockVolumeIds = + new HashMap>(); + for (LocatedBlock b : blocks) { + ArrayList l = new ArrayList(b.getLocations().length); + // Start off all IDs as invalid, fill it in later with results from RPCs + for (int i = 0; i < b.getLocations().length; i++) { + l.add(new HdfsVolumeId((byte)-1, false)); + } + blockVolumeIds.put(b, l); + } + + // Iterate through the list of metadatas (one per datanode). + // For each metadata, if it's valid, insert its volume location information + // into the Map returned to the caller + Iterator metadatasIter = metadatas.iterator(); + Iterator datanodeIter = datanodeBlocks.keySet().iterator(); + while (metadatasIter.hasNext()) { + HdfsBlocksMetadata metadata = metadatasIter.next(); + DatanodeInfo datanode = datanodeIter.next(); + // Check if metadata is valid + if (metadata == null) { + continue; + } + ExtendedBlock[] metaBlocks = metadata.getBlocks(); + List metaVolumeIds = metadata.getVolumeIds(); + List metaVolumeIndexes = metadata.getVolumeIndexes(); + // Add VolumeId for each replica in the HdfsBlocksMetadata + for (int j = 0; j < metaBlocks.length; j++) { + int volumeIndex = metaVolumeIndexes.get(j); + ExtendedBlock extBlock = metaBlocks[j]; + // Skip if block wasn't found, or not a valid index into metaVolumeIds + // Also skip if the DN responded with a block we didn't ask for + if (volumeIndex == Integer.MAX_VALUE + || volumeIndex >= metaVolumeIds.size() + || !extBlockToLocBlock.containsKey(extBlock)) { + continue; + } + // Get the VolumeId by indexing into the list of VolumeIds + // provided by the datanode + HdfsVolumeId id = new HdfsVolumeId(metaVolumeIds.get(volumeIndex)[0], + true); + // Find out which index we are in the LocatedBlock's replicas + LocatedBlock locBlock = extBlockToLocBlock.get(extBlock); + DatanodeInfo[] dnInfos = locBlock.getLocations(); + int index = -1; + for (int k = 0; k < dnInfos.length; k++) { + if (dnInfos[k].equals(datanode)) { + index = k; + break; + } + } + if (index < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Datanode responded with a block volume id we did" + + " not request, omitting."); + } + continue; + } + // Place VolumeId at the same index as the DN's index in the list of + // replicas + List VolumeIds = blockVolumeIds.get(locBlock); + VolumeIds.set(index, id); + } + } + return blockVolumeIds; + } + + /** + * Helper method to combine a list of {@link LocatedBlock} with associated + * {@link VolumeId} information to form a list of {@link BlockStorageLocation} + * . + */ + static BlockStorageLocation[] convertToVolumeBlockLocations( + List blocks, + Map> blockVolumeIds) throws IOException { + // Construct the final return value of VolumeBlockLocation[] + BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); + List volumeBlockLocs = + new ArrayList(locations.length); + for (int i = 0; i < locations.length; i++) { + LocatedBlock locBlock = blocks.get(i); + List volumeIds = blockVolumeIds.get(locBlock); + BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], + volumeIds.toArray(new VolumeId[0])); + volumeBlockLocs.add(bsLoc); + } + return volumeBlockLocs.toArray(new BlockStorageLocation[] {}); + } + + /** + * Callable that sets up an RPC proxy to a datanode and queries it for + * volume location information for a list of ExtendedBlocks. + */ + private static class VolumeBlockLocationCallable implements + Callable { + + private Configuration configuration; + private int timeout; + private DatanodeInfo datanode; + private List extendedBlocks; + private List> dnTokens; + private boolean connectToDnViaHostname; + + VolumeBlockLocationCallable(Configuration configuration, + DatanodeInfo datanode, List extendedBlocks, + List> dnTokens, int timeout, + boolean connectToDnViaHostname) { + this.configuration = configuration; + this.timeout = timeout; + this.datanode = datanode; + this.extendedBlocks = extendedBlocks; + this.dnTokens = dnTokens; + this.connectToDnViaHostname = connectToDnViaHostname; + } + + public DatanodeInfo getDatanodeInfo() { + return datanode; + } + + @Override + public HdfsBlocksMetadata call() throws Exception { + HdfsBlocksMetadata metadata = null; + // Create the RPC proxy and make the RPC + ClientDatanodeProtocol cdp = null; + try { + cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, + timeout, connectToDnViaHostname); + metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens); + } catch (IOException e) { + // Bubble this up to the caller, handle with the Future + throw e; + } finally { + if (cdp != null) { + RPC.stopProxy(cdp); + } + } + return metadata; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 59f9a3cd560..467b612620f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -45,8 +45,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKRE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; @@ -69,6 +67,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -80,6 +79,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -87,12 +87,14 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -102,6 +104,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; @@ -120,8 +123,9 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; @@ -216,6 +220,9 @@ public class DFSClient implements java.io.Closeable { final FsPermission uMask; final boolean useLegacyBlockReader; final boolean connectToDnViaHostname; + final boolean getHdfsBlocksMetadataEnabled; + final int getFileBlockStorageLocationsNumThreads; + final int getFileBlockStorageLocationsTimeout; Conf(Configuration conf) { maxFailoverAttempts = conf.getInt( @@ -268,6 +275,15 @@ public class DFSClient implements java.io.Closeable { DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + getHdfsBlocksMetadataEnabled = conf.getBoolean( + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + getFileBlockStorageLocationsNumThreads = conf.getInt( + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); + getFileBlockStorageLocationsTimeout = conf.getInt( + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT, + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT); } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -942,7 +958,81 @@ public class DFSClient implements java.io.Closeable { public BlockLocation[] getBlockLocations(String src, long start, long length) throws IOException, UnresolvedLinkException { LocatedBlocks blocks = getLocatedBlocks(src, start, length); - return DFSUtil.locatedBlocks2Locations(blocks); + BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); + HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; + for (int i = 0; i < locations.length; i++) { + hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); + } + return hdfsLocations; + } + + /** + * Get block location information about a list of {@link HdfsBlockLocation}. + * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to + * get {@link BlockStorageLocation}s for blocks returned by + * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} + * . + * + * This is done by making a round of RPCs to the associated datanodes, asking + * the volume of each block replica. The returned array of + * {@link BlockStorageLocation} expose this information as a + * {@link VolumeId}. + * + * @param blockLocations + * target blocks on which to query volume location information + * @return volumeBlockLocations original block array augmented with additional + * volume location information for each replica. + */ + public BlockStorageLocation[] getBlockStorageLocations( + List blockLocations) throws IOException, + UnsupportedOperationException, InvalidBlockTokenException { + if (!getConf().getHdfsBlocksMetadataEnabled) { + throw new UnsupportedOperationException("Datanode-side support for " + + "getVolumeBlockLocations() must also be enabled in the client " + + "configuration."); + } + // Downcast blockLocations and fetch out required LocatedBlock(s) + List blocks = new ArrayList(); + for (BlockLocation loc : blockLocations) { + if (!(loc instanceof HdfsBlockLocation)) { + throw new ClassCastException("DFSClient#getVolumeBlockLocations " + + "expected to be passed HdfsBlockLocations"); + } + HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; + blocks.add(hdfsLoc.getLocatedBlock()); + } + + // Re-group the LocatedBlocks to be grouped by datanodes, with the values + // a list of the LocatedBlocks on the datanode. + Map> datanodeBlocks = + new LinkedHashMap>(); + for (LocatedBlock b : blocks) { + for (DatanodeInfo info : b.getLocations()) { + if (!datanodeBlocks.containsKey(info)) { + datanodeBlocks.put(info, new ArrayList()); + } + List l = datanodeBlocks.get(info); + l.add(b); + } + } + + // Make RPCs to the datanodes to get volume locations for its replicas + List metadatas = BlockStorageLocationUtil + .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, + getConf().getFileBlockStorageLocationsNumThreads, + getConf().getFileBlockStorageLocationsTimeout, + getConf().connectToDnViaHostname); + + // Regroup the returned VolumeId metadata to again be grouped by + // LocatedBlock rather than by datanode + Map> blockVolumeIds = BlockStorageLocationUtil + .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas); + + // Combine original BlockLocations with new VolumeId information + BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil + .convertToVolumeBlockLocations(blocks, blockVolumeIds); + + return volumeBlockLocations; } public DFSInputStream open(String src) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 222511eba6c..3cc917e004c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -54,6 +54,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname"; public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false; + public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled"; + public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; + public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads"; + public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10; + public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout"; + public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60; // HA related configuration public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider"; @@ -245,7 +251,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_DU_RESERVED_KEY = "dfs.datanode.du.reserved"; public static final long DFS_DATANODE_DU_RESERVED_DEFAULT = 0; public static final String DFS_DATANODE_HANDLER_COUNT_KEY = "dfs.datanode.handler.count"; - public static final int DFS_DATANODE_HANDLER_COUNT_DEFAULT = 3; + public static final int DFS_DATANODE_HANDLER_COUNT_DEFAULT = 10; public static final String DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address"; public static final int DFS_DATANODE_HTTP_DEFAULT_PORT = 50075; public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index d263acd5906..4cc6935c108 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -282,13 +282,25 @@ public class DFSUtil { if (blocks == null) { return new BlockLocation[0]; } - int nrBlocks = blocks.locatedBlockCount(); + return locatedBlocks2Locations(blocks.getLocatedBlocks()); + } + + /** + * Convert a List to BlockLocation[] + * @param blocks A List to be converted + * @return converted array of BlockLocation + */ + public static BlockLocation[] locatedBlocks2Locations(List blocks) { + if (blocks == null) { + return new BlockLocation[0]; + } + int nrBlocks = blocks.size(); BlockLocation[] blkLocations = new BlockLocation[nrBlocks]; if (nrBlocks == 0) { return blkLocations; } int idx = 0; - for (LocatedBlock blk : blocks.getLocatedBlocks()) { + for (LocatedBlock blk : blocks) { assert idx < nrBlocks : "Incorrect index"; DatanodeInfo[] locations = blk.getLocations(); String[] hosts = new String[locations.length]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index f57f535cb32..f207c8cd8b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -31,6 +32,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.BlockStorageLocation; +import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -188,6 +192,36 @@ public class DistributedFileSystem extends FileSystem { } + /** + * Used to query storage location information for a list of blocks. This list + * of blocks is normally constructed via a series of calls to + * {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to + * get the blocks for ranges of a file. + * + * The returned array of {@link BlockStorageLocation} augments + * {@link BlockLocation} with a {@link VolumeId} per block replica. The + * VolumeId specifies the volume on the datanode on which the replica resides. + * The VolumeId has to be checked via {@link VolumeId#isValid()} before being + * used because volume information can be unavailable if the corresponding + * datanode is down or if the requested block is not found. + * + * This API is unstable, and datanode-side support is disabled by default. It + * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to + * true. + * + * @param blocks + * List of target BlockLocations to query volume location information + * @return volumeBlockLocations Augmented array of + * {@link BlockStorageLocation}s containing additional volume location + * information for each replica of each block. + */ + @InterfaceStability.Unstable + public BlockStorageLocation[] getFileBlockStorageLocations( + List blocks) throws IOException, + UnsupportedOperationException, InvalidBlockTokenException { + return dfs.getBlockStorageLocations(blocks); + } + @Override public void setVerifyChecksum(boolean verifyChecksum) { this.verifyChecksum = verifyChecksum; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 38b13db3f5a..f2b8cc308a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocol; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -106,4 +107,21 @@ public interface ClientDatanodeProtocol { */ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token token) throws IOException; + + /** + * Retrieves volume location information about a list of blocks on a datanode. + * This is in the form of an opaque {@link VolumeId} for each configured + * data directory, which is not guaranteed to be the same across DN restarts. + * + * @param blocks + * list of blocks on the local datanode + * @param tokens + * block access tokens corresponding to the requested blocks + * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with + * data directories + * @throws IOException + * if datanode is unreachable, or replica is not found on datanode + */ + HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + List> tokens) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java new file mode 100644 index 00000000000..5836f3da21c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Augments an array of blocks on a datanode with additional information about + * where the block is stored. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HdfsBlocksMetadata { + + /** + * List of blocks + */ + private final ExtendedBlock[] blocks; + + /** + * List of volumes + */ + private final List volumeIds; + + /** + * List of indexes into volumeIds, one per block in + * blocks. A value of Integer.MAX_VALUE indicates that the + * block was not found. + */ + private final List volumeIndexes; + + /** + * Constructs HdfsBlocksMetadata. + * + * @param blocks + * List of blocks described + * @param volumeIds + * List of potential volume identifiers, specifying volumes where + * blocks may be stored + * @param volumeIndexes + * Indexes into the list of volume identifiers, one per block + */ + public HdfsBlocksMetadata(ExtendedBlock[] blocks, List volumeIds, + List volumeIndexes) { + this.blocks = blocks; + this.volumeIds = volumeIds; + this.volumeIndexes = volumeIndexes; + } + + /** + * Get the array of blocks. + * + * @return array of blocks + */ + public ExtendedBlock[] getBlocks() { + return blocks; + } + + /** + * Get the list of volume identifiers in raw byte form. + * + * @return list of ids + */ + public List getVolumeIds() { + return volumeIds; + } + + /** + * Get a list of indexes into the array of {@link VolumeId}s, one per block. + * + * @return list of indexes + */ + public List getVolumeIndexes() { + return volumeIndexes; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 627512a7fa6..cf447ce18ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -18,19 +18,31 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -106,4 +118,38 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) .build(); } + + @Override + public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations( + RpcController controller, GetHdfsBlockLocationsRequestProto request) + throws ServiceException { + HdfsBlocksMetadata resp; + try { + // Construct the Lists to make the actual call + List blocks = + new ArrayList(request.getBlocksCount()); + for (ExtendedBlockProto b : request.getBlocksList()) { + blocks.add(PBHelper.convert(b)); + } + List> tokens = + new ArrayList>(request.getTokensCount()); + for (BlockTokenIdentifierProto b : request.getTokensList()) { + tokens.add(PBHelper.convert(b)); + } + // Call the real implementation + resp = impl.getHdfsBlocksMetadata(blocks, tokens); + } catch (IOException e) { + throw new ServiceException(e); + } + List volumeIdsByteStrings = + new ArrayList(resp.getVolumeIds().size()); + for (byte[] b : resp.getVolumeIds()) { + volumeIdsByteStrings.add(ByteString.copyFrom(b)); + } + // Build and return the response + Builder builder = GetHdfsBlockLocationsResponseProto.newBuilder(); + builder.addAllVolumeIds(volumeIdsByteStrings); + builder.addAllVolumeIndexes(resp.getVolumeIndexes()); + return builder.build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 1f6c9b113ae..c7c8b08555e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import javax.net.SocketFactory; @@ -33,12 +35,17 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -50,6 +57,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -207,4 +215,44 @@ public class ClientDatanodeProtocolTranslatorPB implements public Object getUnderlyingProxyObject() { return rpcProxy; } -} \ No newline at end of file + + @Override + public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + List> tokens) throws IOException { + // Convert to proto objects + List blocksProtos = + new ArrayList(blocks.size()); + List tokensProtos = + new ArrayList(tokens.size()); + for (ExtendedBlock b : blocks) { + blocksProtos.add(PBHelper.convert(b)); + } + for (Token t : tokens) { + tokensProtos.add(PBHelper.convert(t)); + } + // Build the request + GetHdfsBlockLocationsRequestProto request = + GetHdfsBlockLocationsRequestProto.newBuilder() + .addAllBlocks(blocksProtos) + .addAllTokens(tokensProtos) + .build(); + // Send the RPC + GetHdfsBlockLocationsResponseProto response; + try { + response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + // List of volumes in the response + List volumeIdsByteStrings = response.getVolumeIdsList(); + List volumeIds = new ArrayList(volumeIdsByteStrings.size()); + for (ByteString bs : volumeIdsByteStrings) { + volumeIds.add(bs.toByteArray()); + } + // Array of indexes into the list of volumes, one per block + List volumeIndexes = response.getVolumeIndexesList(); + // Parsed HdfsVolumeId values, one per block + return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), + volumeIds, volumeIndexes); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 83219cca48b..ee849a7c543 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -279,6 +280,7 @@ public class DataNode extends Configured private final String userWithLocalPathAccess; private boolean connectToDnViaHostname; ReadaheadPool readaheadPool; + private final boolean getHdfsBlockLocationsEnabled; /** * Create the DataNode given a configuration and an array of dataDirs. @@ -303,6 +305,9 @@ public class DataNode extends Configured this.connectToDnViaHostname = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + this.getHdfsBlockLocationsEnabled = conf.getBoolean( + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); try { hostName = getHostName(conf); LOG.info("Configured hostname is " + hostName); @@ -1033,6 +1038,25 @@ public class DataNode extends Configured metrics.incrBlocksGetLocalPathInfo(); return info; } + + @Override + public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + List> tokens) throws IOException, + UnsupportedOperationException { + if (!getHdfsBlockLocationsEnabled) { + throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + + " is not enabled in datanode config"); + } + if (blocks.size() != tokens.size()) { + throw new IOException("Differing number of blocks and tokens"); + } + // Check access for each block + for (int i = 0; i < blocks.size(); i++) { + checkBlockToken(blocks.get(i), tokens.get(i), + BlockTokenSecretManager.AccessMode.READ); + } + return data.getHdfsBlocksMetadata(blocks); + } private void checkBlockToken(ExtendedBlock block, Token token, AccessMode accessMode) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index c0fe3906c9b..13ef752750b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.Replica; @@ -373,4 +374,16 @@ public interface FsDatasetSpi extends FSDatasetMBean { */ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b ) throws IOException; + + /** + * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in + * blocks. + * + * @param blocks List of blocks for which to return metadata + * @return metadata Metadata for the list of blocks + * @throws IOException + */ + public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) + throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d037d764535..d3f23c756dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -1667,6 +1668,43 @@ class FsDatasetImpl implements FsDatasetSpi { datafile.getAbsolutePath(), metafile.getAbsolutePath()); return info; } + + @Override // FsDatasetSpi + public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) + throws IOException { + // List of VolumeIds, one per volume on the datanode + List blocksVolumeIds = new ArrayList(volumes.volumes.size()); + // List of indexes into the list of VolumeIds, pointing at the VolumeId of + // the volume that the block is on + List blocksVolumendexes = new ArrayList(blocks.size()); + // Initialize the list of VolumeIds simply by enumerating the volumes + for (int i = 0; i < volumes.volumes.size(); i++) { + blocksVolumeIds.add(new byte[] { (byte) i }); + } + // Determine the index of the VolumeId of each block's volume, by comparing + // the block's volume against the enumerated volumes + for (int i = 0; i < blocks.size(); i++) { + ExtendedBlock block = blocks.get(i); + FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume(); + boolean isValid = false; + int volumeIndex = 0; + for (FsVolumeImpl volume : volumes.volumes) { + // This comparison of references should be safe + if (blockVolume == volume) { + isValid = true; + break; + } + volumeIndex++; + } + // Indicates that the block is not present, or not found in a data dir + if (!isValid) { + volumeIndex = Integer.MAX_VALUE; + } + blocksVolumendexes.add(volumeIndex); + } + return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), + blocksVolumeIds, blocksVolumendexes); + } @Override public RollingLogs createRollingLogs(String bpid, String prefix diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto index a74d7a28055..99dcd146e5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto @@ -89,6 +89,26 @@ message GetBlockLocalPathInfoResponseProto { required string localMetaPath = 3; } +/** + * blocks - list of ExtendedBlocks on which we are querying additional info + * tokens - list of access tokens corresponding to list of ExtendedBlocks + */ +message GetHdfsBlockLocationsRequestProto { + repeated ExtendedBlockProto blocks = 1; + repeated BlockTokenIdentifierProto tokens = 2; +} + +/** + * volumeIds - id of each volume, potentially multiple bytes + * volumeIndexes - for each block, an index into volumeIds specifying the volume + * on which it is located. If block is not present on any volume, + * index is set to MAX_INT. + */ +message GetHdfsBlockLocationsResponseProto { + repeated bytes volumeIds = 1; + repeated uint32 volumeIndexes = 2; +} + /** * Protocol used from client to the Datanode. * See the request and response for details of rpc call. @@ -119,4 +139,11 @@ service ClientDatanodeProtocolService { */ rpc getBlockLocalPathInfo(GetBlockLocalPathInfoRequestProto) returns(GetBlockLocalPathInfoResponseProto); + + /** + * Retrieve additional HDFS-specific metadata about a set of blocks stored + * on the local file system. + */ + rpc getHdfsBlockLocations(GetHdfsBlockLocationsRequestProto) + returns(GetHdfsBlockLocationsResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 16510168445..3721125c8fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -78,7 +78,7 @@ dfs.datanode.handler.count - 3 + 10 The number of server threads for the datanode. @@ -1051,4 +1051,28 @@ + + dfs.datanode.hdfs-blocks-metadata.enabled + false + + Boolean which enables backend datanode-side support for the experimental DistributedFileSystem#getFileVBlockStorageLocations API. + + + + + dfs.client.file-block-storage-locations.num-threads + 10 + + Number of threads used for making parallel RPCs in DistributedFileSystem#getFileBlockStorageLocations(). + + + + + dfs.client.file-block-storage-locations.timeout + 60 + + Timeout (in seconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations(). + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 825348d11cb..76263082b46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -27,10 +27,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Random; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -38,6 +42,7 @@ import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; @@ -570,4 +575,93 @@ public class TestDistributedFileSystem { testDFSClient(); testFileChecksum(); } + + /** + * Tests the normal path of batching up BlockLocation[]s to be passed to a + * single + * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} + * call + */ + @Test + public void testGetFileBlockStorageLocationsBatching() throws Exception { + final Configuration conf = getTestConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + true); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + // Create two files + Path tmpFile1 = new Path("/tmpfile1.dat"); + Path tmpFile2 = new Path("/tmpfile2.dat"); + DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); + DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); + // Get locations of blocks of both files and concat together + BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); + BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); + BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1, + blockLocs2); + // Fetch VolumeBlockLocations in batch + BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays + .asList(blockLocs)); + int counter = 0; + // Print out the list of ids received for each block + for (BlockStorageLocation l : locs) { + for (int i = 0; i < l.getVolumeIds().length; i++) { + VolumeId id = l.getVolumeIds()[i]; + String name = l.getNames()[i]; + if (id != null) { + System.out.println("Datanode " + name + " has block " + counter + + " on volume id " + id.toString()); + } + } + counter++; + } + assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2, + locs.length); + for (BlockStorageLocation l : locs) { + assertEquals("Expected two replicas for each block", 2, + l.getVolumeIds().length); + for (int i = 0; i < l.getVolumeIds().length; i++) { + VolumeId id = l.getVolumeIds()[i]; + String name = l.getNames()[i]; + assertTrue("Expected block to be valid on datanode " + name, + id.isValid()); + } + } + } + + /** + * Tests error paths for + * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} + */ + @Test + public void testGetFileBlockStorageLocationsError() throws Exception { + final Configuration conf = getTestConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + true); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + cluster.getDataNodes(); + DistributedFileSystem fs = cluster.getFileSystem(); + // Create a file + Path tmpFile = new Path("/tmpfile1.dat"); + DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl); + // Get locations of blocks of the file + BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024); + // Stop a datanode to simulate a failure + cluster.stopDataNode(0); + // Fetch VolumeBlockLocations + BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays + .asList(blockLocs)); + + assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1, + locs.length); + + for (BlockStorageLocation l : locs) { + assertEquals("Expected two replicas for each block", 2, + l.getVolumeIds().length); + assertTrue("Expected one valid and one invalid replica", + (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid())); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index d2bf5050708..26926be0f36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -961,6 +962,12 @@ public class SimulatedFSDataset implements FsDatasetSpi { public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { throw new UnsupportedOperationException(); } + + @Override + public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) + throws IOException { + throw new UnsupportedOperationException(); + } @Override public String[] getBlockPoolList() {