HDFS-5304. Expose if a block replica is cached in getFileBlockLocations. (Contributed by Andrew Wang)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1530802 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
54801ba4f9
commit
3fc8792b5c
|
@ -54,6 +54,9 @@ HDFS-4949 (Unreleased)
|
|||
HDFS-5190. Move cache pool related CLI commands to CacheAdmin.
|
||||
(Contributed by Andrew Wang)
|
||||
|
||||
HDFS-5304. Expose if a block replica is cached in getFileBlockLocations.
|
||||
(Contributed by Andrew Wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -17,15 +17,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Associates a block with the Datanodes that contain its replicas
|
||||
* and other block metadata (E.g. the file offset associated with this
|
||||
* block, whether it is corrupt, security token, etc).
|
||||
* block, whether it is corrupt, a location is cached in memory,
|
||||
* security token, etc).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
|
@ -39,9 +45,16 @@ public class LocatedBlock {
|
|||
// their locations are not part of this object
|
||||
private boolean corrupt;
|
||||
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
|
||||
/**
|
||||
* List of cached datanode locations
|
||||
*/
|
||||
private DatanodeInfo[] cachedLocs;
|
||||
|
||||
// Used when there are no locations
|
||||
private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
|
||||
|
||||
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
|
||||
this(b, locs, -1, false); // startOffset is unknown
|
||||
this(b, locs, -1); // startOffset is unknown
|
||||
}
|
||||
|
||||
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
|
||||
|
@ -50,14 +63,26 @@ public class LocatedBlock {
|
|||
|
||||
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
|
||||
boolean corrupt) {
|
||||
this(b, locs, startOffset, corrupt, EMPTY_LOCS);
|
||||
}
|
||||
|
||||
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
|
||||
boolean corrupt, DatanodeInfo[] cachedLocs) {
|
||||
this.b = b;
|
||||
this.offset = startOffset;
|
||||
this.corrupt = corrupt;
|
||||
if (locs==null) {
|
||||
this.locs = new DatanodeInfo[0];
|
||||
this.locs = EMPTY_LOCS;
|
||||
} else {
|
||||
this.locs = locs;
|
||||
}
|
||||
Preconditions.checkArgument(cachedLocs != null,
|
||||
"cachedLocs should not be null, use a different constructor");
|
||||
if (cachedLocs.length == 0) {
|
||||
this.cachedLocs = EMPTY_LOCS;
|
||||
} else {
|
||||
this.cachedLocs = cachedLocs;
|
||||
}
|
||||
}
|
||||
|
||||
public Token<BlockTokenIdentifier> getBlockToken() {
|
||||
|
@ -96,6 +121,36 @@ public class LocatedBlock {
|
|||
return this.corrupt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a the location of a cached replica of the block.
|
||||
*
|
||||
* @param loc of datanode with the cached replica
|
||||
*/
|
||||
public void addCachedLoc(DatanodeInfo loc) {
|
||||
List<DatanodeInfo> cachedList = Lists.newArrayList(cachedLocs);
|
||||
if (cachedList.contains(loc)) {
|
||||
return;
|
||||
}
|
||||
// Try to re-use a DatanodeInfo already in loc
|
||||
for (int i=0; i<locs.length; i++) {
|
||||
if (locs[i].equals(loc)) {
|
||||
cachedList.add(locs[i]);
|
||||
cachedLocs = cachedList.toArray(cachedLocs);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Not present in loc, add it and go
|
||||
cachedList.add(loc);
|
||||
cachedLocs = cachedList.toArray(cachedLocs);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Datanodes with a cached block replica
|
||||
*/
|
||||
public DatanodeInfo[] getCachedLocations() {
|
||||
return cachedLocs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "{" + b
|
||||
|
|
|
@ -150,6 +150,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.CodedInputStream;
|
||||
|
@ -566,9 +567,21 @@ public class PBHelper {
|
|||
if (b == null) return null;
|
||||
Builder builder = LocatedBlockProto.newBuilder();
|
||||
DatanodeInfo[] locs = b.getLocations();
|
||||
List<DatanodeInfo> cachedLocs =
|
||||
Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
|
||||
for (int i = 0; i < locs.length; i++) {
|
||||
builder.addLocs(i, PBHelper.convert(locs[i]));
|
||||
DatanodeInfo loc = locs[i];
|
||||
builder.addLocs(i, PBHelper.convert(loc));
|
||||
boolean locIsCached = cachedLocs.contains(loc);
|
||||
builder.addIsCached(locIsCached);
|
||||
if (locIsCached) {
|
||||
cachedLocs.remove(loc);
|
||||
}
|
||||
}
|
||||
Preconditions.checkArgument(cachedLocs.size() == 0,
|
||||
"Found additional cached replica locations that are not in the set of"
|
||||
+ " storage-backed locations!");
|
||||
|
||||
return builder.setB(PBHelper.convert(b.getBlock()))
|
||||
.setBlockToken(PBHelper.convert(b.getBlockToken()))
|
||||
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
|
||||
|
@ -581,9 +594,20 @@ public class PBHelper {
|
|||
for (int i = 0; i < locs.size(); i++) {
|
||||
targets[i] = PBHelper.convert(locs.get(i));
|
||||
}
|
||||
// Set values from the isCached list, re-using references from loc
|
||||
List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
|
||||
List<Boolean> isCachedList = proto.getIsCachedList();
|
||||
for (int i=0; i<isCachedList.size(); i++) {
|
||||
if (isCachedList.get(i)) {
|
||||
cachedLocs.add(targets[i]);
|
||||
}
|
||||
}
|
||||
|
||||
LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
|
||||
proto.getOffset(), proto.getCorrupt());
|
||||
proto.getOffset(), proto.getCorrupt(),
|
||||
cachedLocs.toArray(new DatanodeInfo[0]));
|
||||
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
|
||||
|
||||
return lb;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
|
||||
|
@ -188,6 +189,14 @@ public class CacheReplicationManager extends ReportProcessor {
|
|||
return bc == null ? 0 : bc.getCacheReplication();
|
||||
}
|
||||
|
||||
public void setCachedLocations(LocatedBlock block) {
|
||||
BlockInfo blockInfo = cachedBlocksMap.getStoredBlock(
|
||||
block.getBlock().getLocalBlock());
|
||||
for (int i=0; i<blockInfo.numNodes(); i++) {
|
||||
block.addCachedLoc(blockInfo.getDatanode(i));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of cached replicas of a block
|
||||
*/
|
||||
|
|
|
@ -1442,6 +1442,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
blockManager.getDatanodeManager().sortLocatedBlocks(
|
||||
clientMachine, lastBlockList);
|
||||
}
|
||||
// Set caching information for the block list
|
||||
for (LocatedBlock lb: blocks.getLocatedBlocks()) {
|
||||
cacheReplicationManager.setCachedLocations(lb);
|
||||
}
|
||||
}
|
||||
return blocks;
|
||||
}
|
||||
|
|
|
@ -128,6 +128,7 @@ message LocatedBlockProto {
|
|||
// their locations are not part of this object
|
||||
|
||||
required hadoop.common.TokenProto blockToken = 5;
|
||||
repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
|
||||
}
|
||||
|
||||
message DataEncryptionKeyProto {
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.fs.HdfsBlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
|
||||
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
|
@ -160,6 +162,18 @@ public class TestCacheReplicationManager {
|
|||
descriptor.getPool());
|
||||
expected += numBlocksPerFile;
|
||||
waitForExpectedNumCachedBlocks(expected);
|
||||
HdfsBlockLocation[] locations =
|
||||
(HdfsBlockLocation[]) dfs.getFileBlockLocations(
|
||||
new Path(paths.get(i)), 0, numBlocksPerFile * BLOCK_SIZE);
|
||||
assertEquals("Unexpected number of locations", numBlocksPerFile,
|
||||
locations.length);
|
||||
for (HdfsBlockLocation loc: locations) {
|
||||
assertEquals("Block should be present on all datanodes",
|
||||
3, loc.getHosts().length);
|
||||
DatanodeInfo[] cachedLocs = loc.getLocatedBlock().getCachedLocations();
|
||||
assertEquals("Block should be cached on all datanodes",
|
||||
loc.getHosts().length, cachedLocs.length);
|
||||
}
|
||||
}
|
||||
// Uncache and check each path in sequence
|
||||
RemoteIterator<PathBasedCacheDescriptor> entries =
|
||||
|
|
Loading…
Reference in New Issue