HDFS-6086. Fix a case where zero-copy or no-checksum reads were not allowed even when the block was cached. (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576533 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
477ed62b3f
commit
a3616c58dd
|
@ -598,6 +598,9 @@ Release 2.4.0 - UNRELEASED
|
|||
HDFS-6077. Running slive with webhdfs on secure HA cluster fails with unkown
|
||||
host exception. (jing9)
|
||||
|
||||
HDFS-6086. Fix a case where zero-copy or no-checksum reads were not allowed
|
||||
even when the block was cached (cmccabe)
|
||||
|
||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||
|
|
|
@ -191,7 +191,15 @@ public class ShortCircuitReplica {
|
|||
if (slot == null) {
|
||||
return false;
|
||||
}
|
||||
return slot.addAnchor();
|
||||
boolean result = slot.addAnchor();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
if (result) {
|
||||
LOG.trace(this + ": added no-checksum anchor to slot " + slot);
|
||||
} else {
|
||||
LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -261,8 +261,10 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
"anything but a UNIX domain socket.");
|
||||
}
|
||||
if (slotId != null) {
|
||||
boolean isCached = datanode.data.
|
||||
isCached(blk.getBlockPoolId(), blk.getBlockId());
|
||||
datanode.shortCircuitRegistry.registerSlot(
|
||||
ExtendedBlockId.fromExtendedBlock(blk), slotId);
|
||||
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
|
||||
}
|
||||
try {
|
||||
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
|
||||
|
|
|
@ -287,12 +287,12 @@ public class ShortCircuitRegistry {
|
|||
return info;
|
||||
}
|
||||
|
||||
public synchronized void registerSlot(ExtendedBlockId blockId, SlotId slotId)
|
||||
throws InvalidRequestException {
|
||||
public synchronized void registerSlot(ExtendedBlockId blockId, SlotId slotId,
|
||||
boolean isCached) throws InvalidRequestException {
|
||||
if (!enabled) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("registerSlot: ShortCircuitRegistry is " +
|
||||
"not enabled.");
|
||||
LOG.trace(this + " can't register a slot because the " +
|
||||
"ShortCircuitRegistry is not enabled.");
|
||||
}
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
@ -303,8 +303,17 @@ public class ShortCircuitRegistry {
|
|||
"registered with shmId " + shmId);
|
||||
}
|
||||
Slot slot = shm.registerSlot(slotId.getSlotIdx(), blockId);
|
||||
if (isCached) {
|
||||
slot.makeAnchorable();
|
||||
} else {
|
||||
slot.makeUnanchorable();
|
||||
}
|
||||
boolean added = slots.put(blockId, slot);
|
||||
Preconditions.checkState(added);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + ": registered " + blockId + " with slot " +
|
||||
slotId + " (isCached=" + isCached + ")");
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void unregisterSlot(SlotId slotId)
|
||||
|
|
|
@ -320,6 +320,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
*/
|
||||
public void uncache(String bpid, long[] blockIds);
|
||||
|
||||
/**
|
||||
* Determine if the specified block is cached.
|
||||
* @param bpid Block pool id
|
||||
* @param blockIds - block id
|
||||
* @returns true if the block is cached
|
||||
*/
|
||||
public boolean isCached(String bpid, long blockId);
|
||||
|
||||
/**
|
||||
* Check if all the data directories are healthy
|
||||
* @throws DiskErrorException
|
||||
|
|
|
@ -473,4 +473,10 @@ public class FsDatasetCache {
|
|||
public long getNumBlocksCached() {
|
||||
return numBlocksCached.get();
|
||||
}
|
||||
|
||||
public synchronized boolean isCached(String bpid, long blockId) {
|
||||
ExtendedBlockId block = new ExtendedBlockId(blockId, bpid);
|
||||
Value val = mappableBlockMap.get(block);
|
||||
return (val != null) && val.state.shouldAdvertise();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1271,6 +1271,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isCached(String bpid, long blockId) {
|
||||
return cacheManager.isCached(bpid, blockId);
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized boolean contains(final ExtendedBlock block) {
|
||||
final long blockId = block.getLocalBlock().getBlockId();
|
||||
|
|
|
@ -916,9 +916,11 @@ public final class CacheManager {
|
|||
if (metrics != null) {
|
||||
metrics.addCacheBlockReport((int) (endTime - startTime));
|
||||
}
|
||||
LOG.info("Processed cache report from "
|
||||
+ datanodeID + ", blocks: " + blockIds.size()
|
||||
+ ", processing time: " + (endTime - startTime) + " msecs");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processed cache report from "
|
||||
+ datanodeID + ", blocks: " + blockIds.size()
|
||||
+ ", processing time: " + (endTime - startTime) + " msecs");
|
||||
}
|
||||
}
|
||||
|
||||
private void processCacheReportImpl(final DatanodeDescriptor datanode,
|
||||
|
|
|
@ -607,7 +607,7 @@ public class TestEnhancedByteBufferAccess {
|
|||
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
||||
DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, 4096));
|
||||
MiniDFSCluster cluster = null;
|
||||
ByteBuffer result = null;
|
||||
ByteBuffer result = null, result2 = null;
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
|
||||
|
@ -644,9 +644,22 @@ public class TestEnhancedByteBufferAccess {
|
|||
} catch (UnsupportedOperationException e) {
|
||||
Assert.fail("expected to be able to read cached file via zero-copy");
|
||||
}
|
||||
// Verify result
|
||||
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
|
||||
BLOCK_SIZE), byteBufferToArray(result));
|
||||
// Test that files opened after the cache operation has finished
|
||||
// still get the benefits of zero-copy (regression test for HDFS-6086)
|
||||
FSDataInputStream fsIn2 = fs.open(TEST_PATH);
|
||||
try {
|
||||
result2 = fsIn2.read(null, TEST_FILE_LENGTH,
|
||||
EnumSet.noneOf(ReadOption.class));
|
||||
} catch (UnsupportedOperationException e) {
|
||||
Assert.fail("expected to be able to read cached file via zero-copy");
|
||||
}
|
||||
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
|
||||
BLOCK_SIZE), byteBufferToArray(result2));
|
||||
fsIn2.releaseBuffer(result2);
|
||||
fsIn2.close();
|
||||
|
||||
// check that the replica is anchored
|
||||
final ExtendedBlock firstBlock =
|
||||
DFSTestUtil.getFirstBlock(fs, TEST_PATH);
|
||||
|
|
|
@ -641,6 +641,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
"SimulatedFSDataset does not support uncache operation!");
|
||||
}
|
||||
|
||||
@Override // FSDatasetSpi
|
||||
public boolean isCached(String bpid, long blockId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
private BInfo getBInfo(final ExtendedBlock b) {
|
||||
final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
|
||||
return map == null? null: map.get(b.getLocalBlock());
|
||||
|
|
Loading…
Reference in New Issue