HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1532116 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-10-14 22:19:10 +00:00
parent 09e9e57a0b
commit 15d08c4778
10 changed files with 177 additions and 60 deletions

View File

@ -61,6 +61,7 @@ HDFS-4949 (Unreleased)
String. (cnauroth)
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
BUG FIXES
HDFS-5169. hdfs.c: translateZCRException: null pointer deref when

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
@ -119,6 +120,7 @@
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@ -696,6 +698,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) {
return PBHelper.convert(proto.getKeyUpdateCmd());
case RegisterCommand:
return REG_CMD;
case BlockIdCommand:
return PBHelper.convert(proto.getBlkIdCmd());
}
return null;
}
@ -738,12 +742,6 @@ public static BlockCommandProto convert(BlockCommand cmd) {
case DatanodeProtocol.DNA_SHUTDOWN:
builder.setAction(BlockCommandProto.Action.SHUTDOWN);
break;
case DatanodeProtocol.DNA_CACHE:
builder.setAction(BlockCommandProto.Action.CACHE);
break;
case DatanodeProtocol.DNA_UNCACHE:
builder.setAction(BlockCommandProto.Action.UNCACHE);
break;
default:
throw new AssertionError("Invalid action");
}
@ -754,6 +752,26 @@ public static BlockCommandProto convert(BlockCommand cmd) {
builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
return builder.build();
}
public static BlockIdCommandProto convert(BlockIdCommand cmd) {
BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder()
.setBlockPoolId(cmd.getBlockPoolId());
switch (cmd.getAction()) {
case DatanodeProtocol.DNA_CACHE:
builder.setAction(BlockIdCommandProto.Action.CACHE);
break;
case DatanodeProtocol.DNA_UNCACHE:
builder.setAction(BlockIdCommandProto.Action.UNCACHE);
break;
default:
throw new AssertionError("Invalid action");
}
long[] blockIds = cmd.getBlockIds();
for (int i = 0; i < blockIds.length; i++) {
builder.addBlockIds(blockIds[i]);
}
return builder.build();
}
private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) {
DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
@ -796,11 +814,14 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
break;
case DatanodeProtocol.DNA_TRANSFER:
case DatanodeProtocol.DNA_INVALIDATE:
case DatanodeProtocol.DNA_SHUTDOWN:
builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).
setBlkCmd(PBHelper.convert((BlockCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_CACHE:
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_SHUTDOWN:
builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
PBHelper.convert((BlockCommand) datanodeCommand));
builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
@ -851,6 +872,20 @@ public static BlockCommand convert(BlockCommandProto blkCmd) {
case SHUTDOWN:
action = DatanodeProtocol.DNA_SHUTDOWN;
break;
default:
throw new AssertionError("Unknown action type: " + blkCmd.getAction());
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
}
public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
int numBlockIds = blkIdCmd.getBlockIdsCount();
long blockIds[] = new long[numBlockIds];
for (int i = 0; i < numBlockIds; i++) {
blockIds[i] = blkIdCmd.getBlockIds(i);
}
int action = DatanodeProtocol.DNA_UNKNOWN;
switch (blkIdCmd.getAction()) {
case CACHE:
action = DatanodeProtocol.DNA_CACHE;
break;
@ -858,9 +893,9 @@ public static BlockCommand convert(BlockCommandProto blkCmd) {
action = DatanodeProtocol.DNA_UNCACHE;
break;
default:
throw new AssertionError("Unknown action type: " + blkCmd.getAction());
throw new AssertionError("Unknown action type: " + blkIdCmd.getAction());
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
return new BlockIdCommand(action, blkIdCmd.getBlockPoolId(), blockIds);
}
public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -1308,14 +1309,22 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
// Check pending caching
List<Block> pendingCacheList = nodeinfo.getCacheBlocks();
if (pendingCacheList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
pendingCacheList.toArray(new Block[] {})));
long blockIds[] = new long[pendingCacheList.size()];
for (int i = 0; i < pendingCacheList.size(); i++) {
blockIds[i] = pendingCacheList.get(i).getBlockId();
}
cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
blockIds));
}
// Check cached block invalidation
blks = nodeinfo.getInvalidateCacheBlocks();
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_UNCACHE,
blockPoolId, blks));
long blockIds[] = new long[blks.length];
for (int i = 0; i < blks.length; i++) {
blockIds[i] = blks[i].getBlockId();
}
cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_UNCACHE,
blockPoolId, blockIds));
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -518,6 +519,8 @@ private boolean processCommandFromActive(DatanodeCommand cmd,
return true;
final BlockCommand bcmd =
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
final BlockIdCommand blockIdCmd =
cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null;
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
@ -545,13 +548,13 @@ private boolean processCommandFromActive(DatanodeCommand cmd,
break;
case DatanodeProtocol.DNA_CACHE:
LOG.info("DatanodeCommand action: DNA_CACHE");
dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks());
dn.metrics.incrBlocksCached(bcmd.getBlocks().length);
dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
break;
case DatanodeProtocol.DNA_UNCACHE:
LOG.info("DatanodeCommand action: DNA_UNCACHE");
dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks());
dn.metrics.incrBlocksUncached(bcmd.getBlocks().length);
dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command

View File

@ -305,16 +305,16 @@ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
/**
* Caches the specified blocks
* @param bpid Block pool id
* @param cacheBlks - block to cache
* @param blockIds - block ids to cache
*/
public void cache(String bpid, Block[] cacheBlks);
public void cache(String bpid, long[] blockIds);
/**
* Uncaches the specified blocks
* @param bpid Block pool id
* @param uncacheBlks - blocks to uncache
* @param blockIds - blocks ids to uncache
*/
public void uncache(String bpid, Block[] uncacheBlks);
public void uncache(String bpid, long[] blockIds);
/**
* Check if all the data directories are healthy

View File

@ -91,8 +91,8 @@ public FsDatasetCache(FsDatasetImpl dataset) {
/**
* @return if the block is cached
*/
boolean isCached(String bpid, Block block) {
MappableBlock mapBlock = cachedBlocks.get(block.getBlockId());
boolean isCached(String bpid, long blockId) {
MappableBlock mapBlock = cachedBlocks.get(blockId);
if (mapBlock != null) {
return mapBlock.getBlockPoolId().equals(bpid);
}
@ -127,7 +127,7 @@ List<Block> getCachedBlocks(String bpid) {
*/
void cacheBlock(String bpid, Block block, FsVolumeImpl volume,
FileInputStream blockIn, FileInputStream metaIn) {
if (isCached(bpid, block)) {
if (isCached(bpid, block.getBlockId())) {
return;
}
MappableBlock mapBlock = null;
@ -166,23 +166,23 @@ void cacheBlock(String bpid, Block block, FsVolumeImpl volume,
/**
* Uncaches a block if it is cached.
* @param block to uncache
* @param blockId id to uncache
*/
void uncacheBlock(String bpid, Block block) {
MappableBlock mapBlock = cachedBlocks.get(block.getBlockId());
void uncacheBlock(String bpid, long blockId) {
MappableBlock mapBlock = cachedBlocks.get(blockId);
if (mapBlock != null &&
mapBlock.getBlockPoolId().equals(bpid) &&
mapBlock.getBlock().equals(block)) {
mapBlock.getBlock().getBlockId() == blockId) {
mapBlock.close();
cachedBlocks.remove(block.getBlockId());
cachedBlocks.remove(blockId);
long bytes = mapBlock.getNumBytes();
long used = usedBytes.get();
while (!usedBytes.compareAndSet(used, used - bytes)) {
used = usedBytes.get();
}
LOG.info("Successfully uncached block " + block);
LOG.info("Successfully uncached block " + blockId);
} else {
LOG.info("Could not uncache block " + block + ": unknown block.");
LOG.info("Could not uncache block " + blockId + ": unknown block.");
}
}
@ -215,7 +215,8 @@ public void run() {
// If we failed or the block became uncacheable in the meantime,
// clean up and return the reserved cache allocation
if (!success ||
!dataset.validToCache(block.getBlockPoolId(), block.getBlock())) {
!dataset.validToCache(block.getBlockPoolId(),
block.getBlock().getBlockId())) {
block.close();
long used = usedBytes.get();
while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) {

View File

@ -562,7 +562,7 @@ private synchronized ReplicaBeingWritten append(String bpid,
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
// uncache the block
cacheManager.uncacheBlock(bpid, replicaInfo);
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
// unlink the finalized replica
replicaInfo.unlinkBlock(1);
@ -1178,7 +1178,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
}
// Uncache the block synchronously
cacheManager.uncacheBlock(bpid, invalidBlks[i]);
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
// Delete the block asynchronously to make sure we can do it fast enough
asyncDiskService.deleteAsync(v, f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
@ -1189,20 +1189,22 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
}
}
synchronized boolean validToCache(String bpid, Block blk) {
ReplicaInfo info = volumeMap.get(bpid, blk);
synchronized boolean validToCache(String bpid, long blockId) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info == null) {
LOG.warn("Failed to cache replica " + blk + ": ReplicaInfo not found.");
LOG.warn("Failed to cache replica in block pool " + bpid +
" with block id " + blockId + ": ReplicaInfo not found.");
return false;
}
FsVolumeImpl volume = (FsVolumeImpl)info.getVolume();
if (volume == null) {
LOG.warn("Failed to cache replica " + blk + ": Volume not found.");
LOG.warn("Failed to cache block with id " + blockId +
": Volume not found.");
return false;
}
if (info.getState() != ReplicaState.FINALIZED) {
LOG.warn("Failed to cache replica " + blk + ": Replica is not"
+ " finalized.");
LOG.warn("Failed to block with id " + blockId +
": Replica is not finalized.");
return false;
}
return true;
@ -1211,31 +1213,33 @@ synchronized boolean validToCache(String bpid, Block blk) {
/**
* Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
*/
private void cacheBlock(String bpid, Block blk) {
private void cacheBlock(String bpid, long blockId) {
ReplicaInfo info;
FsVolumeImpl volume;
synchronized (this) {
if (!validToCache(bpid, blk)) {
if (!validToCache(bpid, blockId)) {
return;
}
info = volumeMap.get(bpid, blk);
info = volumeMap.get(bpid, blockId);
volume = (FsVolumeImpl)info.getVolume();
}
// Try to open block and meta streams
FileInputStream blockIn = null;
FileInputStream metaIn = null;
boolean success = false;
ExtendedBlock extBlk =
new ExtendedBlock(bpid, blockId,
info.getBytesOnDisk(), info.getGenerationStamp());
try {
ExtendedBlock extBlk = new ExtendedBlock(bpid, blk);
blockIn = (FileInputStream)getBlockInputStream(extBlk, 0);
metaIn = (FileInputStream)getMetaDataInputStream(extBlk)
.getWrappedStream();
success = true;
} catch (ClassCastException e) {
LOG.warn("Failed to cache replica " + blk + ": Underlying blocks"
LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks"
+ " are not backed by files.", e);
} catch (IOException e) {
LOG.warn("Failed to cache replica " + blk + ": IOException while"
LOG.warn("Failed to cache replica " + extBlk + ": IOException while"
+ " trying to open block or meta files.", e);
}
if (!success) {
@ -1243,21 +1247,21 @@ private void cacheBlock(String bpid, Block blk) {
IOUtils.closeQuietly(metaIn);
return;
}
cacheManager.cacheBlock(bpid, blk, volume, blockIn, metaIn);
cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(),
volume, blockIn, metaIn);
}
@Override // FsDatasetSpi
public void cache(String bpid, Block[] cacheBlks) {
for (int i=0; i<cacheBlks.length; i++) {
cacheBlock(bpid, cacheBlks[i]);
public void cache(String bpid, long[] blockIds) {
for (int i=0; i < blockIds.length; i++) {
cacheBlock(bpid, blockIds[i]);
}
}
@Override // FsDatasetSpi
public void uncache(String bpid, Block[] uncacheBlks) {
for (int i=0; i<uncacheBlks.length; i++) {
Block blk = uncacheBlks[i];
cacheManager.uncacheBlock(bpid, blk);
public void uncache(String bpid, long[] blockIds) {
for (int i=0; i < blockIds.length; i++) {
cacheManager.uncacheBlock(bpid, blockIds[i]);
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/****************************************************
* A BlockIdCommand is an instruction to a datanode
* regarding some blocks under its control.
****************************************************/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockIdCommand extends DatanodeCommand {
final String poolId;
final long blockIds[];
/**
* Create BlockCommand for the given action
* @param blocks blocks related to the action
*/
public BlockIdCommand(int action, String poolId, long[] blockIds) {
super(action);
this.poolId = poolId;
this.blockIds= blockIds;
}
public String getBlockPoolId() {
return poolId;
}
public long[] getBlockIds() {
return blockIds;
}
}

View File

@ -70,6 +70,7 @@ message DatanodeCommandProto {
RegisterCommand = 5;
UnusedUpgradeCommand = 6;
NullDatanodeCommand = 7;
BlockIdCommand = 8;
}
required Type cmdType = 1; // Type of the command
@ -82,6 +83,7 @@ message DatanodeCommandProto {
optional FinalizeCommandProto finalizeCmd = 5;
optional KeyUpdateCommandProto keyUpdateCmd = 6;
optional RegisterCommandProto registerCmd = 7;
optional BlockIdCommandProto blkIdCmd = 8;
}
/**
@ -103,8 +105,6 @@ message BlockCommandProto {
TRANSFER = 1; // Transfer blocks to another datanode
INVALIDATE = 2; // Invalidate blocks
SHUTDOWN = 3; // Shutdown the datanode
CACHE = 4; // Cache blocks on the datanode
UNCACHE = 5; // Uncache blocks on the datanode
}
required Action action = 1;
required string blockPoolId = 2;
@ -112,6 +112,20 @@ message BlockCommandProto {
repeated DatanodeInfosProto targets = 4;
}
/**
* Command to instruct datanodes to perform certain action
* on the given set of block IDs.
*/
message BlockIdCommandProto {
enum Action {
CACHE = 1;
UNCACHE = 2;
}
required Action action = 1;
required string blockPoolId = 2;
repeated uint64 blockIds = 3 [packed=true];
}
/**
* List of blocks to be recovered by the datanode
*/

View File

@ -580,13 +580,13 @@ public synchronized void invalidate(String bpid, Block[] invalidBlks)
}
@Override // FSDatasetSpi
public void cache(String bpid, Block[] cacheBlks) {
public void cache(String bpid, long[] cacheBlks) {
throw new UnsupportedOperationException(
"SimulatedFSDataset does not support cache operation!");
}
@Override // FSDatasetSpi
public void uncache(String bpid, Block[] uncacheBlks) {
public void uncache(String bpid, long[] uncacheBlks) {
throw new UnsupportedOperationException(
"SimulatedFSDataset does not support uncache operation!");
}