diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index 129d2345ba0..aa045dfb80c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -61,6 +61,7 @@ HDFS-4949 (Unreleased) String. (cnauroth) OPTIMIZATIONS + HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) BUG FIXES HDFS-5169. hdfs.c: translateZCRException: null pointer deref when diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 2e5beaade24..4f9ce6c79aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsS import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; @@ -119,6 +120,7 @@ import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -696,6 +698,8 @@ public class PBHelper { return PBHelper.convert(proto.getKeyUpdateCmd()); case RegisterCommand: return REG_CMD; + case BlockIdCommand: + return PBHelper.convert(proto.getBlkIdCmd()); } return null; } @@ -738,12 +742,6 @@ public class PBHelper { case DatanodeProtocol.DNA_SHUTDOWN: builder.setAction(BlockCommandProto.Action.SHUTDOWN); break; - case DatanodeProtocol.DNA_CACHE: - builder.setAction(BlockCommandProto.Action.CACHE); - break; - case DatanodeProtocol.DNA_UNCACHE: - builder.setAction(BlockCommandProto.Action.UNCACHE); - break; default: throw new AssertionError("Invalid action"); } @@ -754,6 +752,26 @@ public class PBHelper { builder.addAllTargets(PBHelper.convert(cmd.getTargets())); return builder.build(); } + + public static BlockIdCommandProto convert(BlockIdCommand cmd) { + BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder() + .setBlockPoolId(cmd.getBlockPoolId()); + switch (cmd.getAction()) { + case DatanodeProtocol.DNA_CACHE: + builder.setAction(BlockIdCommandProto.Action.CACHE); + break; + case DatanodeProtocol.DNA_UNCACHE: + builder.setAction(BlockIdCommandProto.Action.UNCACHE); + break; + default: + throw new AssertionError("Invalid action"); + } + long[] blockIds = cmd.getBlockIds(); + for (int i = 0; i < blockIds.length; i++) { + builder.addBlockIds(blockIds[i]); + } + return builder.build(); + } private static List convert(DatanodeInfo[][] targets) { DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length]; @@ -796,11 +814,14 @@ public class PBHelper { break; case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_INVALIDATE: + case DatanodeProtocol.DNA_SHUTDOWN: + builder.setCmdType(DatanodeCommandProto.Type.BlockCommand). + setBlkCmd(PBHelper.convert((BlockCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: - case DatanodeProtocol.DNA_SHUTDOWN: - builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd( - PBHelper.convert((BlockCommand) datanodeCommand)); + builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand). + setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand)); break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: @@ -851,6 +872,20 @@ public class PBHelper { case SHUTDOWN: action = DatanodeProtocol.DNA_SHUTDOWN; break; + default: + throw new AssertionError("Unknown action type: " + blkCmd.getAction()); + } + return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets); + } + + public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) { + int numBlockIds = blkIdCmd.getBlockIdsCount(); + long blockIds[] = new long[numBlockIds]; + for (int i = 0; i < numBlockIds; i++) { + blockIds[i] = blkIdCmd.getBlockIds(i); + } + int action = DatanodeProtocol.DNA_UNKNOWN; + switch (blkIdCmd.getAction()) { case CACHE: action = DatanodeProtocol.DNA_CACHE; break; @@ -858,9 +893,9 @@ public class PBHelper { action = DatanodeProtocol.DNA_UNCACHE; break; default: - throw new AssertionError("Unknown action type: " + blkCmd.getAction()); + throw new AssertionError("Unknown action type: " + blkIdCmd.getAction()); } - return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets); + return new BlockIdCommand(action, blkIdCmd.getBlockPoolId(), blockIds); } public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 83959319e0f..006184a9ac2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -1308,14 +1309,22 @@ public class DatanodeManager { // Check pending caching List pendingCacheList = nodeinfo.getCacheBlocks(); if (pendingCacheList != null) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_CACHE, blockPoolId, - pendingCacheList.toArray(new Block[] {}))); + long blockIds[] = new long[pendingCacheList.size()]; + for (int i = 0; i < pendingCacheList.size(); i++) { + blockIds[i] = pendingCacheList.get(i).getBlockId(); + } + cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_CACHE, blockPoolId, + blockIds)); } // Check cached block invalidation blks = nodeinfo.getInvalidateCacheBlocks(); if (blks != null) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_UNCACHE, - blockPoolId, blks)); + long blockIds[] = new long[blks.length]; + for (int i = 0; i < blks.length; i++) { + blockIds[i] = blks[i].getBlockId(); + } + cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_UNCACHE, + blockPoolId, blockIds)); } blockManager.addKeyUpdateCommand(cmds, nodeinfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index bc78eda828a..ce934764673 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -518,6 +519,8 @@ class BPOfferService { return true; final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null; + final BlockIdCommand blockIdCmd = + cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null; switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: @@ -545,13 +548,13 @@ class BPOfferService { break; case DatanodeProtocol.DNA_CACHE: LOG.info("DatanodeCommand action: DNA_CACHE"); - dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks()); - dn.metrics.incrBlocksCached(bcmd.getBlocks().length); + dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); + dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_UNCACHE: LOG.info("DatanodeCommand action: DNA_UNCACHE"); - dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks()); - dn.metrics.incrBlocksUncached(bcmd.getBlocks().length); + dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); + dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index bf93f149fdf..07f0e72aada 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -305,16 +305,16 @@ public interface FsDatasetSpi extends FSDatasetMBean { /** * Caches the specified blocks * @param bpid Block pool id - * @param cacheBlks - block to cache + * @param blockIds - block ids to cache */ - public void cache(String bpid, Block[] cacheBlks); + public void cache(String bpid, long[] blockIds); /** * Uncaches the specified blocks * @param bpid Block pool id - * @param uncacheBlks - blocks to uncache + * @param blockIds - blocks ids to uncache */ - public void uncache(String bpid, Block[] uncacheBlks); + public void uncache(String bpid, long[] blockIds); /** * Check if all the data directories are healthy diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index b0a3a8d77fb..4bd1cf5039c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -91,8 +91,8 @@ public class FsDatasetCache { /** * @return if the block is cached */ - boolean isCached(String bpid, Block block) { - MappableBlock mapBlock = cachedBlocks.get(block.getBlockId()); + boolean isCached(String bpid, long blockId) { + MappableBlock mapBlock = cachedBlocks.get(blockId); if (mapBlock != null) { return mapBlock.getBlockPoolId().equals(bpid); } @@ -127,7 +127,7 @@ public class FsDatasetCache { */ void cacheBlock(String bpid, Block block, FsVolumeImpl volume, FileInputStream blockIn, FileInputStream metaIn) { - if (isCached(bpid, block)) { + if (isCached(bpid, block.getBlockId())) { return; } MappableBlock mapBlock = null; @@ -166,23 +166,23 @@ public class FsDatasetCache { /** * Uncaches a block if it is cached. - * @param block to uncache + * @param blockId id to uncache */ - void uncacheBlock(String bpid, Block block) { - MappableBlock mapBlock = cachedBlocks.get(block.getBlockId()); + void uncacheBlock(String bpid, long blockId) { + MappableBlock mapBlock = cachedBlocks.get(blockId); if (mapBlock != null && mapBlock.getBlockPoolId().equals(bpid) && - mapBlock.getBlock().equals(block)) { + mapBlock.getBlock().getBlockId() == blockId) { mapBlock.close(); - cachedBlocks.remove(block.getBlockId()); + cachedBlocks.remove(blockId); long bytes = mapBlock.getNumBytes(); long used = usedBytes.get(); while (!usedBytes.compareAndSet(used, used - bytes)) { used = usedBytes.get(); } - LOG.info("Successfully uncached block " + block); + LOG.info("Successfully uncached block " + blockId); } else { - LOG.info("Could not uncache block " + block + ": unknown block."); + LOG.info("Could not uncache block " + blockId + ": unknown block."); } } @@ -215,7 +215,8 @@ public class FsDatasetCache { // If we failed or the block became uncacheable in the meantime, // clean up and return the reserved cache allocation if (!success || - !dataset.validToCache(block.getBlockPoolId(), block.getBlock())) { + !dataset.validToCache(block.getBlockPoolId(), + block.getBlock().getBlockId())) { block.close(); long used = usedBytes.get(); while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index f5e0c371136..be664fd76b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -562,7 +562,7 @@ class FsDatasetImpl implements FsDatasetSpi { FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { // uncache the block - cacheManager.uncacheBlock(bpid, replicaInfo); + cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); // unlink the finalized replica replicaInfo.unlinkBlock(1); @@ -1178,7 +1178,7 @@ class FsDatasetImpl implements FsDatasetSpi { } // Uncache the block synchronously - cacheManager.uncacheBlock(bpid, invalidBlks[i]); + cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); // Delete the block asynchronously to make sure we can do it fast enough asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), @@ -1189,20 +1189,22 @@ class FsDatasetImpl implements FsDatasetSpi { } } - synchronized boolean validToCache(String bpid, Block blk) { - ReplicaInfo info = volumeMap.get(bpid, blk); + synchronized boolean validToCache(String bpid, long blockId) { + ReplicaInfo info = volumeMap.get(bpid, blockId); if (info == null) { - LOG.warn("Failed to cache replica " + blk + ": ReplicaInfo not found."); + LOG.warn("Failed to cache replica in block pool " + bpid + + " with block id " + blockId + ": ReplicaInfo not found."); return false; } FsVolumeImpl volume = (FsVolumeImpl)info.getVolume(); if (volume == null) { - LOG.warn("Failed to cache replica " + blk + ": Volume not found."); + LOG.warn("Failed to cache block with id " + blockId + + ": Volume not found."); return false; } if (info.getState() != ReplicaState.FINALIZED) { - LOG.warn("Failed to cache replica " + blk + ": Replica is not" - + " finalized."); + LOG.warn("Failed to block with id " + blockId + + ": Replica is not finalized."); return false; } return true; @@ -1211,31 +1213,33 @@ class FsDatasetImpl implements FsDatasetSpi { /** * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. */ - private void cacheBlock(String bpid, Block blk) { + private void cacheBlock(String bpid, long blockId) { ReplicaInfo info; FsVolumeImpl volume; synchronized (this) { - if (!validToCache(bpid, blk)) { + if (!validToCache(bpid, blockId)) { return; } - info = volumeMap.get(bpid, blk); + info = volumeMap.get(bpid, blockId); volume = (FsVolumeImpl)info.getVolume(); } // Try to open block and meta streams FileInputStream blockIn = null; FileInputStream metaIn = null; boolean success = false; + ExtendedBlock extBlk = + new ExtendedBlock(bpid, blockId, + info.getBytesOnDisk(), info.getGenerationStamp()); try { - ExtendedBlock extBlk = new ExtendedBlock(bpid, blk); blockIn = (FileInputStream)getBlockInputStream(extBlk, 0); metaIn = (FileInputStream)getMetaDataInputStream(extBlk) .getWrappedStream(); success = true; } catch (ClassCastException e) { - LOG.warn("Failed to cache replica " + blk + ": Underlying blocks" + LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks" + " are not backed by files.", e); } catch (IOException e) { - LOG.warn("Failed to cache replica " + blk + ": IOException while" + LOG.warn("Failed to cache replica " + extBlk + ": IOException while" + " trying to open block or meta files.", e); } if (!success) { @@ -1243,21 +1247,21 @@ class FsDatasetImpl implements FsDatasetSpi { IOUtils.closeQuietly(metaIn); return; } - cacheManager.cacheBlock(bpid, blk, volume, blockIn, metaIn); + cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(), + volume, blockIn, metaIn); } @Override // FsDatasetSpi - public void cache(String bpid, Block[] cacheBlks) { - for (int i=0; i { } @Override // FSDatasetSpi - public void cache(String bpid, Block[] cacheBlks) { + public void cache(String bpid, long[] cacheBlks) { throw new UnsupportedOperationException( "SimulatedFSDataset does not support cache operation!"); } @Override // FSDatasetSpi - public void uncache(String bpid, Block[] uncacheBlks) { + public void uncache(String bpid, long[] uncacheBlks) { throw new UnsupportedOperationException( "SimulatedFSDataset does not support uncache operation!"); }