HDFS-8895. Remove deprecated BlockStorageLocation APIs.

This commit is contained in:
Andrew Wang 2015-08-17 17:40:37 -07:00
parent a88f31ebf3
commit eee4d716b4
23 changed files with 2 additions and 1445 deletions

View File

@ -101,32 +101,6 @@ message GetBlockLocalPathInfoResponseProto {
required string localMetaPath = 3; required string localMetaPath = 3;
} }
/**
* Query for the disk locations of a number of blocks on this DN.
* blockPoolId - the pool to query
* blockIds - list of block IDs to query
* tokens - list of access tokens corresponding to list of block IDs
*/
message GetHdfsBlockLocationsRequestProto {
// Removed: HDFS-3969
// repeated ExtendedBlockProto blocks = 1;
repeated hadoop.common.TokenProto tokens = 2;
required string blockPoolId = 3;
repeated sfixed64 blockIds = 4 [ packed = true ];
}
/**
* volumeIds - id of each volume, potentially multiple bytes
* volumeIndexes - for each block, an index into volumeIds specifying the volume
* on which it is located. If block is not present on any volume,
* index is set to MAX_INT.
*/
message GetHdfsBlockLocationsResponseProto {
repeated bytes volumeIds = 1;
repeated uint32 volumeIndexes = 2 [ packed = true ];
}
/** /**
* forUpgrade - if true, clients are advised to wait for restart and quick * forUpgrade - if true, clients are advised to wait for restart and quick
* upgrade restart is instrumented. Otherwise, datanode does * upgrade restart is instrumented. Otherwise, datanode does
@ -219,13 +193,6 @@ service ClientDatanodeProtocolService {
rpc getBlockLocalPathInfo(GetBlockLocalPathInfoRequestProto) rpc getBlockLocalPathInfo(GetBlockLocalPathInfoRequestProto)
returns(GetBlockLocalPathInfoResponseProto); returns(GetBlockLocalPathInfoResponseProto);
/**
* Retrieve additional HDFS-specific metadata about a set of blocks stored
* on the local file system.
*/
rpc getHdfsBlockLocations(GetHdfsBlockLocationsRequestProto)
returns(GetHdfsBlockLocationsResponseProto);
rpc shutdownDatanode(ShutdownDatanodeRequestProto) rpc shutdownDatanode(ShutdownDatanodeRequestProto)
returns(ShutdownDatanodeResponseProto); returns(ShutdownDatanodeResponseProto);

View File

@ -34,6 +34,8 @@ Trunk (Unreleased)
HDFS-8591. Remove support for deprecated configuration key HDFS-8591. Remove support for deprecated configuration key
dfs.namenode.decommission.nodes.per.interval. (wang) dfs.namenode.decommission.nodes.per.interval. (wang)
HDFS-8895. Remove deprecated BlockStorageLocation APIs. (wang)
NEW FEATURES NEW FEATURES
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh) HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume
* location information for each replica.
*/
@InterfaceStability.Unstable
@InterfaceAudience.Public
@Deprecated
public class BlockStorageLocation extends BlockLocation {
private final VolumeId[] volumeIds;
public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds)
throws IOException {
// Initialize with data from passed in BlockLocation
super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc
.getOffset(), loc.getLength(), loc.isCorrupt());
this.volumeIds = volumeIds;
}
/**
* Gets the list of {@link VolumeId} corresponding to the block's replicas.
*
* @return volumeIds list of VolumeId for the block's replicas
*/
public VolumeId[] getVolumeIds() {
return volumeIds;
}
}

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
/**
* HDFS-specific volume identifier which implements {@link VolumeId}. Can be
* used to differentiate between the data directories on a single datanode. This
* identifier is only unique on a per-datanode basis.
*/
@InterfaceStability.Unstable
@InterfaceAudience.Public
public class HdfsVolumeId implements VolumeId {
private final byte[] id;
public HdfsVolumeId(byte[] id) {
Preconditions.checkNotNull(id, "id cannot be null");
this.id = id;
}
@Override
public int compareTo(VolumeId arg0) {
if (arg0 == null) {
return 1;
}
return hashCode() - arg0.hashCode();
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(id).toHashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
if (obj == this) {
return true;
}
HdfsVolumeId that = (HdfsVolumeId) obj;
return new EqualsBuilder().append(this.id, that.id).isEquals();
}
@Override
public String toString() {
return StringUtils.byteToHexString(id);
}
}

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Opaque interface that identifies a disk location. Subclasses
* should implement {@link Comparable} and override both equals and hashCode.
*/
@InterfaceStability.Unstable
@InterfaceAudience.Public
public interface VolumeId extends Comparable<VolumeId> {
@Override
abstract public int compareTo(VolumeId arg0);
@Override
abstract public int hashCode();
@Override
abstract public boolean equals(Object obj);
}

View File

@ -1,368 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.HdfsVolumeId;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class BlockStorageLocationUtil {
static final Log LOG = LogFactory
.getLog(BlockStorageLocationUtil.class);
/**
* Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
* of datanodes and blocks. The blocks must all correspond to the same
* block pool.
*
* @param datanodeBlocks
* Map of datanodes to block replicas at each datanode
* @return callables Used to query each datanode for location information on
* the block replicas at the datanode
*/
private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int timeout, boolean connectToDnViaHostname, Span parent) {
if (datanodeBlocks.isEmpty()) {
return Lists.newArrayList();
}
// Construct the callables, one per datanode
List<VolumeBlockLocationCallable> callables =
new ArrayList<VolumeBlockLocationCallable>();
for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
.entrySet()) {
// Construct RPC parameters
DatanodeInfo datanode = entry.getKey();
List<LocatedBlock> locatedBlocks = entry.getValue();
if (locatedBlocks.isEmpty()) {
continue;
}
// Ensure that the blocks all are from the same block pool.
String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
for (LocatedBlock lb : locatedBlocks) {
if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
throw new IllegalArgumentException(
"All blocks to be queried must be in the same block pool: " +
locatedBlocks.get(0).getBlock() + " and " + lb +
" are from different pools.");
}
}
long[] blockIds = new long[locatedBlocks.size()];
int i = 0;
List<Token<BlockTokenIdentifier>> dnTokens =
new ArrayList<Token<BlockTokenIdentifier>>(
locatedBlocks.size());
for (LocatedBlock b : locatedBlocks) {
blockIds[i++] = b.getBlock().getBlockId();
dnTokens.add(b.getBlockToken());
}
VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
conf, datanode, poolId, blockIds, dnTokens, timeout,
connectToDnViaHostname, parent);
callables.add(callable);
}
return callables;
}
/**
* Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
* making one RPC to each datanode. These RPCs are made in parallel using a
* threadpool.
*
* @param datanodeBlocks
* Map of datanodes to the blocks present on the DN
* @return metadatas Map of datanodes to block metadata of the DN
* @throws InvalidBlockTokenException
* if client does not have read access on a requested block
*/
static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int poolsize, int timeoutMs, boolean connectToDnViaHostname)
throws InvalidBlockTokenException {
List<VolumeBlockLocationCallable> callables =
createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
connectToDnViaHostname, Trace.currentSpan());
// Use a thread pool to execute the Callables in parallel
List<Future<HdfsBlocksMetadata>> futures =
new ArrayList<Future<HdfsBlocksMetadata>>();
ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
try {
futures = executor.invokeAll(callables, timeoutMs,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Swallow the exception here, because we can return partial results
}
executor.shutdown();
Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
// Fill in metadatas with results from DN RPCs, where possible
for (int i = 0; i < futures.size(); i++) {
VolumeBlockLocationCallable callable = callables.get(i);
DatanodeInfo datanode = callable.getDatanodeInfo();
Future<HdfsBlocksMetadata> future = futures.get(i);
try {
HdfsBlocksMetadata metadata = future.get();
metadatas.put(callable.getDatanodeInfo(), metadata);
} catch (CancellationException e) {
LOG.info("Cancelled while waiting for datanode "
+ datanode.getIpcAddr(false) + ": " + e.toString());
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof InvalidBlockTokenException) {
LOG.warn("Invalid access token when trying to retrieve "
+ "information from datanode " + datanode.getIpcAddr(false));
throw (InvalidBlockTokenException) t;
}
else if (t instanceof UnsupportedOperationException) {
LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
+ " required #getHdfsBlocksMetadata() API");
throw (UnsupportedOperationException) t;
} else {
LOG.info("Failed to query block locations on datanode " +
datanode.getIpcAddr(false) + ": " + t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Could not fetch information from datanode", t);
}
} catch (InterruptedException e) {
// Shouldn't happen, because invokeAll waits for all Futures to be ready
LOG.info("Interrupted while fetching HdfsBlocksMetadata");
}
}
return metadatas;
}
/**
* Group the per-replica {@link VolumeId} info returned from
* {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be
* associated
* with the corresponding {@link LocatedBlock}.
*
* @param blocks
* Original LocatedBlock array
* @param metadatas
* VolumeId information for the replicas on each datanode
* @return blockVolumeIds per-replica VolumeId information associated with the
* parent LocatedBlock
*/
static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
List<LocatedBlock> blocks,
Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
// Initialize mapping of ExtendedBlock to LocatedBlock.
// Used to associate results from DN RPCs to the parent LocatedBlock
Map<Long, LocatedBlock> blockIdToLocBlock =
new HashMap<Long, LocatedBlock>();
for (LocatedBlock b : blocks) {
blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
}
// Initialize the mapping of blocks -> list of VolumeIds, one per replica
// This is filled out with real values from the DN RPCs
Map<LocatedBlock, List<VolumeId>> blockVolumeIds =
new HashMap<LocatedBlock, List<VolumeId>>();
for (LocatedBlock b : blocks) {
ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
for (int i = 0; i < b.getLocations().length; i++) {
l.add(null);
}
blockVolumeIds.put(b, l);
}
// Iterate through the list of metadatas (one per datanode).
// For each metadata, if it's valid, insert its volume location information
// into the Map returned to the caller
for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
DatanodeInfo datanode = entry.getKey();
HdfsBlocksMetadata metadata = entry.getValue();
// Check if metadata is valid
if (metadata == null) {
continue;
}
long[] metaBlockIds = metadata.getBlockIds();
List<byte[]> metaVolumeIds = metadata.getVolumeIds();
List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
// Add VolumeId for each replica in the HdfsBlocksMetadata
for (int j = 0; j < metaBlockIds.length; j++) {
int volumeIndex = metaVolumeIndexes.get(j);
long blockId = metaBlockIds[j];
// Skip if block wasn't found, or not a valid index into metaVolumeIds
// Also skip if the DN responded with a block we didn't ask for
if (volumeIndex == Integer.MAX_VALUE
|| volumeIndex >= metaVolumeIds.size()
|| !blockIdToLocBlock.containsKey(blockId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("No data for block " + blockId);
}
continue;
}
// Get the VolumeId by indexing into the list of VolumeIds
// provided by the datanode
byte[] volumeId = metaVolumeIds.get(volumeIndex);
HdfsVolumeId id = new HdfsVolumeId(volumeId);
// Find out which index we are in the LocatedBlock's replicas
LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
DatanodeInfo[] dnInfos = locBlock.getLocations();
int index = -1;
for (int k = 0; k < dnInfos.length; k++) {
if (dnInfos[k].equals(datanode)) {
index = k;
break;
}
}
if (index < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Datanode responded with a block volume id we did" +
" not request, omitting.");
}
continue;
}
// Place VolumeId at the same index as the DN's index in the list of
// replicas
List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
volumeIds.set(index, id);
}
}
return blockVolumeIds;
}
/**
* Helper method to combine a list of {@link LocatedBlock} with associated
* {@link VolumeId} information to form a list of {@link BlockStorageLocation}
* .
*/
static BlockStorageLocation[] convertToVolumeBlockLocations(
List<LocatedBlock> blocks,
Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
// Construct the final return value of VolumeBlockLocation[]
BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
List<BlockStorageLocation> volumeBlockLocs =
new ArrayList<BlockStorageLocation>(locations.length);
for (int i = 0; i < locations.length; i++) {
LocatedBlock locBlock = blocks.get(i);
List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i],
volumeIds.toArray(new VolumeId[0]));
volumeBlockLocs.add(bsLoc);
}
return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
}
/**
* Callable that sets up an RPC proxy to a datanode and queries it for
* volume location information for a list of ExtendedBlocks.
*/
private static class VolumeBlockLocationCallable implements
Callable<HdfsBlocksMetadata> {
private final Configuration configuration;
private final int timeout;
private final DatanodeInfo datanode;
private final String poolId;
private final long[] blockIds;
private final List<Token<BlockTokenIdentifier>> dnTokens;
private final boolean connectToDnViaHostname;
private final Span parentSpan;
VolumeBlockLocationCallable(Configuration configuration,
DatanodeInfo datanode, String poolId, long []blockIds,
List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
boolean connectToDnViaHostname, Span parentSpan) {
this.configuration = configuration;
this.timeout = timeout;
this.datanode = datanode;
this.poolId = poolId;
this.blockIds = blockIds;
this.dnTokens = dnTokens;
this.connectToDnViaHostname = connectToDnViaHostname;
this.parentSpan = parentSpan;
}
public DatanodeInfo getDatanodeInfo() {
return datanode;
}
@Override
public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
TraceScope scope =
Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
} catch (IOException e) {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
scope.close();
if (cdp != null) {
RPC.stopProxy(cdp);
}
}
return metadata;
}
}
}

View File

@ -42,7 +42,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -70,7 +69,6 @@ import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -91,7 +89,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
@ -121,7 +118,6 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@ -923,87 +919,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} }
/**
* Get block location information about a list of {@link HdfsBlockLocation}.
* Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
* get {@link BlockStorageLocation}s for blocks returned by
* {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
* .
*
* This is done by making a round of RPCs to the associated datanodes, asking
* the volume of each block replica. The returned array of
* {@link BlockStorageLocation} expose this information as a
* {@link VolumeId}.
*
* @param blockLocations
* target blocks on which to query volume location information
* @return volumeBlockLocations original block array augmented with additional
* volume location information for each replica.
*/
public BlockStorageLocation[] getBlockStorageLocations(
List<BlockLocation> blockLocations) throws IOException,
UnsupportedOperationException, InvalidBlockTokenException {
checkOpen();
if (!getConf().isHdfsBlocksMetadataEnabled()) {
throw new UnsupportedOperationException("Datanode-side support for " +
"getVolumeBlockLocations() must also be enabled in the client " +
"configuration.");
}
// Downcast blockLocations and fetch out required LocatedBlock(s)
List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
for (BlockLocation loc : blockLocations) {
if (!(loc instanceof HdfsBlockLocation)) {
throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
"expected to be passed HdfsBlockLocations");
}
HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
blocks.add(hdfsLoc.getLocatedBlock());
}
// Re-group the LocatedBlocks to be grouped by datanodes, with the values
// a list of the LocatedBlocks on the datanode.
Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks =
new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
for (LocatedBlock b : blocks) {
for (DatanodeInfo info : b.getLocations()) {
if (!datanodeBlocks.containsKey(info)) {
datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
}
List<LocatedBlock> l = datanodeBlocks.get(info);
l.add(b);
}
}
// Make RPCs to the datanodes to get volume locations for its replicas
TraceScope scope =
Trace.startSpan("getBlockStorageLocations", traceSampler);
Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
try {
metadatas = BlockStorageLocationUtil.
queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
getConf().getFileBlockStorageLocationsNumThreads(),
getConf().getFileBlockStorageLocationsTimeoutMs(),
getConf().isConnectToDnViaHostname());
if (LOG.isTraceEnabled()) {
LOG.trace("metadata returned: "
+ Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
}
} finally {
scope.close();
}
// Regroup the returned VolumeId metadata to again be grouped by
// LocatedBlock rather than by datanode
Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
.associateVolumeIdsWithBlocks(blocks, metadatas);
// Combine original BlockLocations with new VolumeId information
BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
.convertToVolumeBlockLocations(blocks, blockVolumeIds);
return volumeBlockLocations;
}
/** /**
* Decrypts a EDEK by consulting the KeyProvider. * Decrypts a EDEK by consulting the KeyProvider.
*/ */

View File

@ -50,8 +50,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_USER_HOME_DIR_PREFIX_DEFAULT = "/user"; public static final String DFS_USER_HOME_DIR_PREFIX_DEFAULT = "/user";
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type"; public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT = public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT; HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT;
@ -985,10 +983,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_CONTEXT = "dfs.client.context"; public static final String DFS_CLIENT_CONTEXT = "dfs.client.context";
public static final String DFS_CLIENT_CONTEXT_DEFAULT = "default"; public static final String DFS_CLIENT_CONTEXT_DEFAULT = "default";
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = "dfs.client.file-block-storage-locations.timeout.millis";
public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = 1000;
public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout"; public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout";
public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
@ -58,7 +57,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -85,7 +83,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -233,42 +230,6 @@ public class DistributedFileSystem extends FileSystem {
}.resolve(this, absF); }.resolve(this, absF);
} }
/**
* This API has been deprecated since the NameNode now tracks datanode
* storages separately. Storage IDs can be gotten from {@link
* BlockLocation#getStorageIds()}, which are functionally equivalent to
* the volume IDs returned here (although a String rather than a byte[]).
*
* Used to query storage location information for a list of blocks. This list
* of blocks is normally constructed via a series of calls to
* {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
* get the blocks for ranges of a file.
*
* The returned array of {@link BlockStorageLocation} augments
* {@link BlockLocation} with a {@link VolumeId} per block replica. The
* VolumeId specifies the volume on the datanode on which the replica resides.
* The VolumeId associated with a replica may be null because volume
* information can be unavailable if the corresponding datanode is down or
* if the requested block is not found.
*
* This API is unstable, and datanode-side support is disabled by default. It
* can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to
* true.
*
* @param blocks
* List of target BlockLocations to query volume location information
* @return volumeBlockLocations Augmented array of
* {@link BlockStorageLocation}s containing additional volume location
* information for each replica of each block.
*/
@InterfaceStability.Unstable
@Deprecated
public BlockStorageLocation[] getFileBlockStorageLocations(
List<BlockLocation> blocks) throws IOException,
UnsupportedOperationException, InvalidBlockTokenException {
return dfs.getBlockStorageLocations(blocks);
}
@Override @Override
public void setVerifyChecksum(boolean verifyChecksum) { public void setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;

View File

@ -138,8 +138,6 @@ public class HdfsConfiguration extends Configuration {
DFSConfigKeys.DFS_NAMESERVICES), DFSConfigKeys.DFS_NAMESERVICES),
new DeprecationDelta("dfs.federation.nameservice.id", new DeprecationDelta("dfs.federation.nameservice.id",
DFSConfigKeys.DFS_NAMESERVICE_ID), DFSConfigKeys.DFS_NAMESERVICE_ID),
new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
}); });
} }

View File

@ -88,9 +88,6 @@ public class DfsClientConf {
private final String taskId; private final String taskId;
private final FsPermission uMask; private final FsPermission uMask;
private final boolean connectToDnViaHostname; private final boolean connectToDnViaHostname;
private final boolean hdfsBlocksMetadataEnabled;
private final int fileBlockStorageLocationsNumThreads;
private final int fileBlockStorageLocationsTimeoutMs;
private final int retryTimesForGetLastBlockLength; private final int retryTimesForGetLastBlockLength;
private final int retryIntervalForGetLastBlockLength; private final int retryIntervalForGetLastBlockLength;
private final long datanodeRestartTimeout; private final long datanodeRestartTimeout;
@ -190,15 +187,6 @@ public class DfsClientConf {
uMask = FsPermission.getUMask(conf); uMask = FsPermission.getUMask(conf);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
hdfsBlocksMetadataEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
fileBlockStorageLocationsNumThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
fileBlockStorageLocationsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
datanodeRestartTimeout = conf.getLong( datanodeRestartTimeout = conf.getLong(
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
@ -428,27 +416,6 @@ public class DfsClientConf {
return connectToDnViaHostname; return connectToDnViaHostname;
} }
/**
* @return the hdfsBlocksMetadataEnabled
*/
public boolean isHdfsBlocksMetadataEnabled() {
return hdfsBlocksMetadataEnabled;
}
/**
* @return the fileBlockStorageLocationsNumThreads
*/
public int getFileBlockStorageLocationsNumThreads() {
return fileBlockStorageLocationsNumThreads;
}
/**
* @return the getFileBlockStorageLocationsTimeoutMs
*/
public int getFileBlockStorageLocationsTimeoutMs() {
return fileBlockStorageLocationsTimeoutMs;
}
/** /**
* @return the retryTimesForGetLastBlockLength * @return the retryTimesForGetLastBlockLength
*/ */

View File

@ -110,25 +110,6 @@ public interface ClientDatanodeProtocol {
BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException; Token<BlockTokenIdentifier> token) throws IOException;
/**
* Retrieves volume location information about a list of blocks on a datanode.
* This is in the form of an opaque {@link org.apache.hadoop.fs.VolumeId}
* for each configured data directory, which is not guaranteed to be
* the same across DN restarts.
*
* @param blockPoolId the pool to query
* @param blockIds
* list of blocks on the local datanode
* @param tokens
* block access tokens corresponding to the requested blocks
* @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with
* data directories
* @throws IOException
* if datanode is unreachable, or replica is not found on datanode
*/
HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
long []blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException;
/** /**
* Shuts down a datanode. * Shuts down a datanode.
* *

View File

@ -1,111 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
/**
* Augments an array of blocks on a datanode with additional information about
* where the block is stored.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HdfsBlocksMetadata {
/** The block pool that was queried */
private final String blockPoolId;
/**
* List of blocks
*/
private final long[] blockIds;
/**
* List of volumes
*/
private final List<byte[]> volumeIds;
/**
* List of indexes into <code>volumeIds</code>, one per block in
* <code>blocks</code>. A value of Integer.MAX_VALUE indicates that the
* block was not found.
*/
private final List<Integer> volumeIndexes;
/**
* Constructs HdfsBlocksMetadata.
*
* @param blockIds
* List of blocks described
* @param volumeIds
* List of potential volume identifiers, specifying volumes where
* blocks may be stored
* @param volumeIndexes
* Indexes into the list of volume identifiers, one per block
*/
public HdfsBlocksMetadata(String blockPoolId,
long[] blockIds, List<byte[]> volumeIds,
List<Integer> volumeIndexes) {
Preconditions.checkArgument(blockIds.length == volumeIndexes.size(),
"Argument lengths should match");
this.blockPoolId = blockPoolId;
this.blockIds = blockIds;
this.volumeIds = volumeIds;
this.volumeIndexes = volumeIndexes;
}
/**
* Get the array of blocks.
*
* @return array of blocks
*/
public long[] getBlockIds() {
return blockIds;
}
/**
* Get the list of volume identifiers in raw byte form.
*
* @return list of ids
*/
public List<byte[]> getVolumeIds() {
return volumeIds;
}
/**
* Get a list of indexes into the array of {@link VolumeId}s, one per block.
*
* @return list of indexes
*/
public List<Integer> getVolumeIndexes() {
return volumeIndexes;
}
@Override
public String toString() {
return "Metadata for " + blockIds.length + " blocks in " +
blockPoolId + ": " + Joiner.on(",").join(Longs.asList(blockIds));
}
}

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
@ -39,9 +38,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDat
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
@ -54,12 +50,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartR
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -142,38 +133,6 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
.build(); .build();
} }
@Override
public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations(
RpcController controller, GetHdfsBlockLocationsRequestProto request)
throws ServiceException {
HdfsBlocksMetadata resp;
try {
String poolId = request.getBlockPoolId();
List<Token<BlockTokenIdentifier>> tokens =
new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount());
for (TokenProto b : request.getTokensList()) {
tokens.add(PBHelper.convert(b));
}
long[] blockIds = Longs.toArray(request.getBlockIdsList());
// Call the real implementation
resp = impl.getHdfsBlocksMetadata(poolId, blockIds, tokens);
} catch (IOException e) {
throw new ServiceException(e);
}
List<ByteString> volumeIdsByteStrings =
new ArrayList<ByteString>(resp.getVolumeIds().size());
for (byte[] b : resp.getVolumeIds()) {
volumeIdsByteStrings.add(ByteString.copyFrom(b));
}
// Build and return the response
Builder builder = GetHdfsBlockLocationsResponseProto.newBuilder();
builder.addAllVolumeIds(volumeIdsByteStrings);
builder.addAllVolumeIndexes(resp.getVolumeIndexes());
return builder.build();
}
@Override @Override
public ShutdownDatanodeResponseProto shutdownDatanode( public ShutdownDatanodeResponseProto shutdownDatanode(
RpcController unused, ShutdownDatanodeRequestProto request) RpcController unused, ShutdownDatanodeRequestProto request)

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -42,15 +41,12 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto;
@ -70,11 +66,8 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -242,42 +235,6 @@ public class ClientDatanodeProtocolTranslatorPB implements
return rpcProxy; return rpcProxy;
} }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
long[] blockIds,
List<Token<BlockTokenIdentifier>> tokens) throws IOException {
List<TokenProto> tokensProtos =
new ArrayList<TokenProto>(tokens.size());
for (Token<BlockTokenIdentifier> t : tokens) {
tokensProtos.add(PBHelper.convert(t));
}
// Build the request
GetHdfsBlockLocationsRequestProto request =
GetHdfsBlockLocationsRequestProto.newBuilder()
.setBlockPoolId(blockPoolId)
.addAllBlockIds(Longs.asList(blockIds))
.addAllTokens(tokensProtos)
.build();
// Send the RPC
GetHdfsBlockLocationsResponseProto response;
try {
response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
// List of volumes in the response
List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
for (ByteString bs : volumeIdsByteStrings) {
volumeIds.add(bs.toByteArray());
}
// Array of indexes into the list of volumes, one per block
List<Integer> volumeIndexes = response.getVolumeIndexesList();
// Parsed HdfsVolumeId values, one per block
return new HdfsBlocksMetadata(blockPoolId, blockIds,
volumeIds, volumeIndexes);
}
@Override @Override
public void shutdownDatanode(boolean forUpgrade) throws IOException { public void shutdownDatanode(boolean forUpgrade) throws IOException {
ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto

View File

@ -115,7 +115,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
@ -346,7 +345,6 @@ public class DataNode extends ReconfigurableBase
ReadaheadPool readaheadPool; ReadaheadPool readaheadPool;
SaslDataTransferClient saslClient; SaslDataTransferClient saslClient;
SaslDataTransferServer saslServer; SaslDataTransferServer saslServer;
private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName; private ObjectName dataNodeInfoBeanName;
private Thread checkDiskErrorThread = null; private Thread checkDiskErrorThread = null;
protected final int checkDiskErrorInterval = 5*1000; protected final int checkDiskErrorInterval = 5*1000;
@ -374,7 +372,6 @@ public class DataNode extends ReconfigurableBase
this.confVersion = null; this.confVersion = null;
this.usersWithLocalPathAccess = null; this.usersWithLocalPathAccess = null;
this.connectToDnViaHostname = false; this.connectToDnViaHostname = false;
this.getHdfsBlockLocationsEnabled = false;
this.blockScanner = new BlockScanner(this, conf); this.blockScanner = new BlockScanner(this, conf);
this.pipelineSupportECN = false; this.pipelineSupportECN = false;
} }
@ -397,9 +394,6 @@ public class DataNode extends ReconfigurableBase
this.connectToDnViaHostname = conf.getBoolean( this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.getHdfsBlockLocationsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean( this.isPermissionEnabled = conf.getBoolean(
@ -1624,29 +1618,6 @@ public class DataNode extends ReconfigurableBase
return fis; return fis;
} }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(
String bpId, long[] blockIds,
List<Token<BlockTokenIdentifier>> tokens) throws IOException,
UnsupportedOperationException {
if (!getHdfsBlockLocationsEnabled) {
throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
+ " is not enabled in datanode config");
}
if (blockIds.length != tokens.size()) {
throw new IOException("Differing number of blocks and tokens");
}
// Check access for each block
for (int i = 0; i < blockIds.length; i++) {
checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
tokens.get(i), BlockTokenIdentifier.AccessMode.READ);
}
DataNodeFaultInjector.get().getHdfsBlocksMetadata();
return data.getHdfsBlocksMetadata(bpId, blockIds);
}
private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token, private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
AccessMode accessMode) throws IOException { AccessMode accessMode) throws IOException {
if (isBlockTokenEnabled) { if (isBlockTokenEnabled) {

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@ -560,18 +559,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
) throws IOException; ) throws IOException;
/**
* Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in
* <code>blocks</code>.
*
* @param bpid pool to query
* @param blockIds List of block ids for which to return metadata
* @return metadata Metadata for the list of blocks
* @throws IOException
*/
HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid,
long[] blockIds) throws IOException;
/** /**
* Enable 'trash' for the given dataset. When trash is enabled, files are * Enable 'trash' for the given dataset. When trash is enabled, files are
* moved to a separate trash directory instead of being deleted immediately. * moved to a separate trash directory instead of being deleted immediately.

View File

@ -28,7 +28,6 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
@ -60,12 +59,10 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -2658,48 +2655,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return info; return info;
} }
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
long[] blockIds) throws IOException {
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
// List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
// Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < curVolumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by comparing
// the block's volume against the enumerated volumes
for (int i = 0; i < blockIds.length; i++) {
long blockId = blockIds[i];
boolean isValid = false;
ReplicaInfo info = volumeMap.get(poolId, blockId);
int volumeIndex = 0;
if (info != null) {
FsVolumeSpi blockVolume = info.getVolume();
for (FsVolumeImpl volume : curVolumes) {
// This comparison of references should be safe
if (blockVolume == volume) {
isValid = true;
break;
}
volumeIndex++;
}
}
// Indicates that the block is not present, or not found in a data dir
if (!isValid) {
volumeIndex = Integer.MAX_VALUE;
}
blocksVolumeIndexes.add(volumeIndex);
}
return new HdfsBlocksMetadata(poolId, blockIds,
blocksVolumeIds, blocksVolumeIndexes);
}
@Override @Override
public void enableTrash(String bpid) { public void enableTrash(String bpid) {
dataStorage.enableTrash(bpid); dataStorage.enableTrash(bpid);

View File

@ -1697,30 +1697,6 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>false</value>
<description>
Boolean which enables backend datanode-side support for the experimental DistributedFileSystem#getFileVBlockStorageLocations API.
</description>
</property>
<property>
<name>dfs.client.file-block-storage-locations.num-threads</name>
<value>10</value>
<description>
Number of threads used for making parallel RPCs in DistributedFileSystem#getFileBlockStorageLocations().
</description>
</property>
<property>
<name>dfs.client.file-block-storage-locations.timeout.millis</name>
<value>1000</value>
<description>
Timeout (in milliseconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations().
</description>
</property>
<property> <property>
<name>dfs.journalnode.rpc-address</name> <name>dfs.journalnode.rpc-address</name>
<value>0.0.0.0:8485</value> <value>0.0.0.0:8485</value>

View File

@ -1,146 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestVolumeId {
@Test
public void testEquality() {
final VolumeId id1 = new HdfsVolumeId(new byte[] { (byte)0, (byte)0 });
testEq(true, id1, id1);
final VolumeId id2 = new HdfsVolumeId(new byte[] { (byte)0, (byte)1 });
testEq(true, id2, id2);
testEq(false, id1, id2);
final VolumeId id3 = new HdfsVolumeId(new byte[] { (byte)1, (byte)0 });
testEq(true, id3, id3);
testEq(false, id1, id3);
// same as 2, but "invalid":
final VolumeId id2copy1 = new HdfsVolumeId(new byte[] { (byte)0, (byte)1 });
testEq(true, id2, id2copy1);
// same as 2copy1:
final VolumeId id2copy2 = new HdfsVolumeId(new byte[] { (byte)0, (byte)1 });
testEq(true, id2, id2copy2);
testEqMany(true, new VolumeId[] { id2, id2copy1, id2copy2 });
testEqMany(false, new VolumeId[] { id1, id2, id3 });
}
@SuppressWarnings("unchecked")
private <T> void testEq(final boolean eq, Comparable<T> id1, Comparable<T> id2) {
final int h1 = id1.hashCode();
final int h2 = id2.hashCode();
// eq reflectivity:
assertTrue(id1.equals(id1));
assertTrue(id2.equals(id2));
assertEquals(0, id1.compareTo((T)id1));
assertEquals(0, id2.compareTo((T)id2));
// eq symmetry:
assertEquals(eq, id1.equals(id2));
assertEquals(eq, id2.equals(id1));
// null comparison:
assertFalse(id1.equals(null));
assertFalse(id2.equals(null));
// compareTo:
assertEquals(eq, 0 == id1.compareTo((T)id2));
assertEquals(eq, 0 == id2.compareTo((T)id1));
// compareTo must be antisymmetric:
assertEquals(sign(id1.compareTo((T)id2)), -sign(id2.compareTo((T)id1)));
// compare with null should never return 0 to be consistent with #equals():
assertTrue(id1.compareTo(null) != 0);
assertTrue(id2.compareTo(null) != 0);
// check that hash codes did not change:
assertEquals(h1, id1.hashCode());
assertEquals(h2, id2.hashCode());
if (eq) {
// in this case the hash codes must be the same:
assertEquals(h1, h2);
}
}
private static int sign(int x) {
if (x == 0) {
return 0;
} else if (x > 0) {
return 1;
} else {
return -1;
}
}
@SuppressWarnings("unchecked")
private <T> void testEqMany(final boolean eq, Comparable<T>... volumeIds) {
Comparable<T> vidNext;
int sum = 0;
for (int i=0; i<volumeIds.length; i++) {
if (i == volumeIds.length - 1) {
vidNext = volumeIds[0];
} else {
vidNext = volumeIds[i + 1];
}
testEq(eq, volumeIds[i], vidNext);
sum += sign(volumeIds[i].compareTo((T)vidNext));
}
// the comparison relationship must always be acyclic:
assertTrue(sum < volumeIds.length);
}
/*
* Test HdfsVolumeId(new byte[0]) instances: show that we permit such
* objects, they are still valid, and obey the same equality
* rules other objects do.
*/
@Test
public void testIdEmptyBytes() {
final VolumeId idEmpty1 = new HdfsVolumeId(new byte[0]);
final VolumeId idEmpty2 = new HdfsVolumeId(new byte[0]);
final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 });
testEq(true, idEmpty1, idEmpty2);
testEq(false, idEmpty1, idNotEmpty);
testEq(false, idEmpty2, idNotEmpty);
}
/*
* test #toString() for typical VolumeId equality classes
*/
@Test
public void testToString() {
String strEmpty = new HdfsVolumeId(new byte[] {}).toString();
assertNotNull(strEmpty);
String strNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 }).toString();
assertNotNull(strNotEmpty);
}
}

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -38,20 +36,16 @@ import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -65,21 +59,16 @@ import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -88,12 +77,6 @@ import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
public class TestDistributedFileSystem { public class TestDistributedFileSystem {
private static final Random RAN = new Random(); private static final Random RAN = new Random();
@ -216,12 +199,6 @@ public class TestDistributedFileSystem {
} catch (IOException ioe) { } catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Filesystem closed", ioe); GenericTestUtils.assertExceptionContains("Filesystem closed", ioe);
} }
try {
dfsClient.getBlockStorageLocations(new ArrayList<BlockLocation>());
fail("getBlockStorageLocations using a closed filesystem!");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Filesystem closed", ioe);
}
try { try {
dfsClient.createSymlink("target", "link", true); dfsClient.createSymlink("target", "link", true);
fail("createSymlink using a closed filesystem!"); fail("createSymlink using a closed filesystem!");
@ -886,213 +863,6 @@ public class TestDistributedFileSystem {
} }
} }
/**
* Tests the normal path of batching up BlockLocation[]s to be passed to a
* single
* {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
* call
*/
@Test(timeout=60000)
public void testGetFileBlockStorageLocationsBatching() throws Exception {
final Configuration conf = getTestConfiguration();
((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.TRACE);
((Log4JLogger)BlockStorageLocationUtil.LOG).getLogger().setLevel(Level.TRACE);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.TRACE);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
try {
final DistributedFileSystem fs = cluster.getFileSystem();
// Create two files
final Path tmpFile1 = new Path("/tmpfile1.dat");
final Path tmpFile2 = new Path("/tmpfile2.dat");
DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
// Make sure files are fully replicated before continuing
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<BlockLocation> list = Lists.newArrayList();
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0,
1024)));
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0,
1024)));
int totalRepl = 0;
for (BlockLocation loc : list) {
totalRepl += loc.getHosts().length;
}
if (totalRepl == 4) {
return true;
}
} catch(IOException e) {
// swallow
}
return false;
}
}, 500, 30000);
// Get locations of blocks of both files and concat together
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1,
blockLocs2);
// Fetch VolumeBlockLocations in batch
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
.asList(blockLocs));
int counter = 0;
// Print out the list of ids received for each block
for (BlockStorageLocation l : locs) {
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
if (id != null) {
System.out.println("Datanode " + name + " has block " + counter
+ " on volume id " + id.toString());
}
}
counter++;
}
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getVolumeIds().length);
for (int i = 0; i < l.getVolumeIds().length; i++) {
VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name,
id != null);
}
}
} finally {
cluster.shutdown();
}
}
/**
* Tests error paths for
* {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
*/
@Test(timeout=60000)
public void testGetFileBlockStorageLocationsError() throws Exception {
final Configuration conf = getTestConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
true);
conf.setInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, 1500);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.getDataNodes();
final DistributedFileSystem fs = cluster.getFileSystem();
// Create a few files and add together their block locations into
// a list.
final Path tmpFile1 = new Path("/errorfile1.dat");
final Path tmpFile2 = new Path("/errorfile2.dat");
DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
// Make sure files are fully replicated before continuing
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<BlockLocation> list = Lists.newArrayList();
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0,
1024)));
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0,
1024)));
int totalRepl = 0;
for (BlockLocation loc : list) {
totalRepl += loc.getHosts().length;
}
if (totalRepl == 4) {
return true;
}
} catch(IOException e) {
// swallow
}
return false;
}
}, 500, 30000);
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
List<BlockLocation> allLocs = Lists.newArrayList();
allLocs.addAll(Arrays.asList(blockLocs1));
allLocs.addAll(Arrays.asList(blockLocs2));
// Stall on the DN to test the timeout
DataNodeFaultInjector injector = Mockito.mock(DataNodeFaultInjector.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(3000);
return null;
}
}).when(injector).getHdfsBlocksMetadata();
DataNodeFaultInjector.instance = injector;
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(allLocs);
for (BlockStorageLocation loc: locs) {
assertEquals(
"Found more than 0 cached hosts although RPCs supposedly timed out",
0, loc.getCachedHosts().length);
}
// Restore a default injector
DataNodeFaultInjector.instance = new DataNodeFaultInjector();
// Stop a datanode to simulate a failure.
DataNodeProperties stoppedNode = cluster.stopDataNode(0);
// Fetch VolumeBlockLocations
locs = fs.getFileBlockStorageLocations(allLocs);
assertEquals("Expected two HdfsBlockLocation for two 1-block files", 2,
locs.length);
for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2,
l.getHosts().length);
assertEquals("Expected two VolumeIDs for each block", 2,
l.getVolumeIds().length);
assertTrue("Expected one valid and one invalid volume",
(l.getVolumeIds()[0] == null) ^ (l.getVolumeIds()[1] == null));
}
// Start the datanode again, and remove one of the blocks.
// This is a different type of failure where the block itself
// is invalid.
cluster.restartDataNode(stoppedNode, true /*keepPort*/);
cluster.waitActive();
fs.delete(tmpFile2, true);
HATestUtil.waitForNNToIssueDeletions(cluster.getNameNode());
cluster.triggerHeartbeats();
HATestUtil.waitForDNDeletions(cluster);
locs = fs.getFileBlockStorageLocations(allLocs);
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
assertNotNull(locs[0].getVolumeIds()[0]);
assertNotNull(locs[0].getVolumeIds()[1]);
assertNull(locs[1].getVolumeIds()[0]);
assertNull(locs[1].getVolumeIds()[1]);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test @Test
public void testCreateWithCustomChecksum() throws Exception { public void testCreateWithCustomChecksum() throws Exception {
Configuration conf = getTestConfiguration(); Configuration conf = getTestConfiguration();

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@ -1244,12 +1243,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds)
throws IOException {
throw new UnsupportedOperationException();
}
@Override @Override
public void enableTrash(String bpid) { public void enableTrash(String bpid) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.*; import org.apache.hadoop.hdfs.server.datanode.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -295,11 +294,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
return new BlockLocalPathInfo(null, "file", "metafile"); return new BlockLocalPathInfo(null, "file", "metafile");
} }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException {
return new HdfsBlocksMetadata(null, null, null, null);
}
@Override @Override
public void enableTrash(String bpid) { public void enableTrash(String bpid) {