HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)

Added parameter dfs.client.short.circuit.num improving HDFS-client's massive reading performance by create few instances ShortCircuit caches instead of one. It helps avoid locks and lets CPU do job.
This commit is contained in:
pustota2009 2020-05-18 17:04:04 +03:00 committed by GitHub
parent b65815d691
commit 86e6aa8eec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 214 additions and 75 deletions

View File

@ -77,7 +77,7 @@ public class ClientContext {
/** /**
* Caches short-circuit file descriptors, mmap regions. * Caches short-circuit file descriptors, mmap regions.
*/ */
private final ShortCircuitCache shortCircuitCache; private final ShortCircuitCache[] shortCircuitCache;
/** /**
* Caches TCP and UNIX domain sockets for reuse. * Caches TCP and UNIX domain sockets for reuse.
@ -132,13 +132,23 @@ public class ClientContext {
*/ */
private DeadNodeDetector deadNodeDetector = null; private DeadNodeDetector deadNodeDetector = null;
/**
* ShortCircuitCache array size.
*/
private final int clientShortCircuitNum;
private ClientContext(String name, DfsClientConf conf, private ClientContext(String name, DfsClientConf conf,
Configuration config) { Configuration config) {
final ShortCircuitConf scConf = conf.getShortCircuitConf(); final ShortCircuitConf scConf = conf.getShortCircuitConf();
this.name = name; this.name = name;
this.confString = scConf.confAsString(); this.confString = scConf.confAsString();
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); this.clientShortCircuitNum = conf.getClientShortCircuitNum();
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
for (int i = 0; i < this.clientShortCircuitNum; i++) {
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
}
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
scConf.getSocketCacheExpiry()); scConf.getSocketCacheExpiry());
this.keyProviderCache = new KeyProviderCache( this.keyProviderCache = new KeyProviderCache(
@ -228,7 +238,11 @@ public class ClientContext {
} }
public ShortCircuitCache getShortCircuitCache() { public ShortCircuitCache getShortCircuitCache() {
return shortCircuitCache; return shortCircuitCache[0];
}
public ShortCircuitCache getShortCircuitCache(long idx) {
return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
} }
public PeerCache getPeerCache() { public PeerCache getPeerCache() {

View File

@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys {
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
60000; 60000;
String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.client.slow.io.warning.threshold.ms"; "dfs.client.slow.io.warning.threshold.ms";
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000; long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;

View File

@ -476,7 +476,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
"giving up on BlockReaderLocal.", this, pathInfo); "giving up on BlockReaderLocal.", this, pathInfo);
return null; return null;
} }
ShortCircuitCache cache = clientContext.getShortCircuitCache(); ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId()); block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
@ -527,7 +528,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (curPeer.fromCache) remainingCacheTries--; if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer; DomainPeer peer = (DomainPeer)curPeer.peer;
Slot slot = null; Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache(); ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
try { try {
MutableBoolean usedPeer = new MutableBoolean(false); MutableBoolean usedPeer = new MutableBoolean(false);
slot = cache.allocShmSlot(datanode, peer, usedPeer, slot = cache.allocShmSlot(datanode, peer, usedPeer,
@ -582,7 +584,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/ */
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
Slot slot) throws IOException { Slot slot) throws IOException {
ShortCircuitCache cache = clientContext.getShortCircuitCache(); ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE)); new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
SlotId slotId = slot == null ? null : slot.getSlotId(); SlotId slotId = slot == null ? null : slot.getSlotId();

View File

@ -142,6 +142,7 @@ public class DfsClientConf {
private final long refreshReadBlockLocationsMS; private final long refreshReadBlockLocationsMS;
private final ShortCircuitConf shortCircuitConf; private final ShortCircuitConf shortCircuitConf;
private final int clientShortCircuitNum;
private final long hedgedReadThresholdMillis; private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize; private final int hedgedReadThreadpoolSize;
@ -272,8 +273,6 @@ public class DfsClientConf {
HdfsClientConfigKeys. HdfsClientConfigKeys.
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT); DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
shortCircuitConf = new ShortCircuitConf(conf);
hedgedReadThresholdMillis = conf.getLong( hedgedReadThresholdMillis = conf.getLong(
HedgedRead.THRESHOLD_MILLIS_KEY, HedgedRead.THRESHOLD_MILLIS_KEY,
HedgedRead.THRESHOLD_MILLIS_DEFAULT); HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@ -296,6 +295,17 @@ public class DfsClientConf {
leaseHardLimitPeriod = leaseHardLimitPeriod =
conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY, conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000; HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
shortCircuitConf = new ShortCircuitConf(conf);
clientShortCircuitNum = conf.getInt(
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
Preconditions.checkArgument(clientShortCircuitNum >= 1,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be less then 1.");
Preconditions.checkArgument(clientShortCircuitNum <= 5,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be more then 5.");
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -601,6 +611,13 @@ public class DfsClientConf {
return slowIoWarningThresholdMs; return slowIoWarningThresholdMs;
} }
/*
* @return the clientShortCircuitNum
*/
public int getClientShortCircuitNum() {
return clientShortCircuitNum;
}
/** /**
* @return the hedgedReadThresholdMillis * @return the hedgedReadThresholdMillis
*/ */

View File

@ -4178,6 +4178,16 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.short.circuit.num</name>
<value>1</value>
<description>
Number of short-circuit caches. This setting should
be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
values may speed up massive parallel reading files.
</description>
</property>
<property> <property>
<name>dfs.client.read.striped.threadpool.size</name> <name>dfs.client.read.striped.threadpool.size</name>
<value>18</value> <value>18</value>

View File

@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
fsIn.close(); fsIn.close();
fsIn = fs.open(TEST_PATH); fsIn = fs.open(TEST_PATH);
final ShortCircuitCache cache = ClientContext.get( final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache(); CONTEXT, conf).getShortCircuitCache(0);
cache.accept(new CountingVisitor(0, 5, 5, 0)); cache.accept(new CountingVisitor(0, 5, 5, 0));
results[0] = fsIn.read(null, BLOCK_SIZE, results[0] = fsIn.read(null, BLOCK_SIZE,
EnumSet.of(ReadOption.SKIP_CHECKSUMS)); EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@ -654,12 +654,12 @@ public class TestEnhancedByteBufferAccess {
BLOCK_SIZE), byteBufferToArray(result2)); BLOCK_SIZE), byteBufferToArray(result2));
fsIn2.releaseBuffer(result2); fsIn2.releaseBuffer(result2);
fsIn2.close(); fsIn2.close();
// check that the replica is anchored // check that the replica is anchored
final ExtendedBlock firstBlock = final ExtendedBlock firstBlock =
DFSTestUtil.getFirstBlock(fs, TEST_PATH); DFSTestUtil.getFirstBlock(fs, TEST_PATH);
final ShortCircuitCache cache = ClientContext.get( final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache(); CONTEXT, conf).getShortCircuitCache(0);
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
// Uncache the replica // Uncache the replica
fs.removeCacheDirective(directiveId); fs.removeCacheDirective(directiveId);

View File

@ -389,7 +389,7 @@ public class TestBlockReaderFactory {
try (FSDataInputStream in = dfs.open(testFile)) { try (FSDataInputStream in = dfs.open(testFile)) {
Assert.assertEquals(0, Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize()); .getReplicaInfoMapSize());
final byte[] buf = new byte[testFileLen]; final byte[] buf = new byte[testFileLen];
@ -398,12 +398,12 @@ public class TestBlockReaderFactory {
// Set cache size to 0 so the replica marked evictable by unbuffer // Set cache size to 0 so the replica marked evictable by unbuffer
// will be purged immediately. // will be purged immediately.
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.setMaxTotalSize(0); .setMaxTotalSize(0);
LOG.info("Unbuffering"); LOG.info("Unbuffering");
in.unbuffer(); in.unbuffer();
Assert.assertEquals(0, Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize()); .getReplicaInfoMapSize());
DFSTestUtil.appendFile(dfs, testFile, "append more data"); DFSTestUtil.appendFile(dfs, testFile, "append more data");
@ -432,7 +432,7 @@ public class TestBlockReaderFactory {
final int expectedScrRepMapSize) { final int expectedScrRepMapSize) {
Assert.assertThat(expected, CoreMatchers.is(actual)); Assert.assertThat(expected, CoreMatchers.is(actual));
Assert.assertEquals(expectedScrRepMapSize, Assert.assertEquals(expectedScrRepMapSize,
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize()); .getReplicaInfoMapSize());
} }
@ -467,7 +467,7 @@ public class TestBlockReaderFactory {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected)); Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
final DatanodeInfo datanode = new DatanodeInfoBuilder() final DatanodeInfo datanode = new DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId()) .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
.build(); .build();
@ -516,7 +516,7 @@ public class TestBlockReaderFactory {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected)); Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
Assert.assertEquals(null, cache.getDfsClientShmManager()); Assert.assertEquals(null, cache.getDfsClientShmManager());
cluster.shutdown(); cluster.shutdown();
sockDir.close(); sockDir.close();
@ -548,7 +548,7 @@ public class TestBlockReaderFactory {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected)); Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
cache.close(); cache.close();
Assert.assertTrue(cache.getDfsClientShmManager(). Assert.assertTrue(cache.getDfsClientShmManager().
getDomainSocketWatcher().isClosed()); getDomainSocketWatcher().isClosed());

View File

@ -116,7 +116,7 @@ public class TestBlockReaderLocal {
} }
private static class BlockReaderLocalTest { private static class BlockReaderLocalTest {
final static int TEST_LENGTH = 12345; final static int TEST_LENGTH = 1234567;
final static int BYTES_PER_CHECKSUM = 512; final static int BYTES_PER_CHECKSUM = 512;
public void setConfiguration(HdfsConfiguration conf) { public void setConfiguration(HdfsConfiguration conf) {
@ -130,10 +130,14 @@ public class TestBlockReaderLocal {
throws IOException { throws IOException {
// default: no-op // default: no-op
} }
} public void doTest(BlockReaderLocal reader, byte[] original, int shift)
throws IOException {
// default: no-op
} }
public void runBlockReaderLocalTest(BlockReaderLocalTest test, public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead) throws IOException { boolean checksum, long readahead, int shortCircuitCachesNum)
throws IOException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
@ -143,10 +147,13 @@ public class TestBlockReaderLocal {
BlockReaderLocalTest.BYTES_PER_CHECKSUM); BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
shortCircuitCachesNum);
test.setConfiguration(conf); test.setConfiguration(conf);
FileInputStream dataIn = null, metaIn = null; FileInputStream dataIn = null, metaIn = null;
final Path TEST_PATH = new Path("/a"); final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L; final long RANDOM_SEED = 4567L;
final int blockSize = 10 * 1024;
BlockReaderLocal blockReaderLocal = null; BlockReaderLocal blockReaderLocal = null;
FSDataInputStream fsIn = null; FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
@ -158,8 +165,8 @@ public class TestBlockReaderLocal {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, DFSTestUtil.createFile(fs, TEST_PATH, 1024,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED);
try { try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -174,47 +181,52 @@ public class TestBlockReaderLocal {
BlockReaderLocalTest.TEST_LENGTH); BlockReaderLocalTest.TEST_LENGTH);
fsIn.close(); fsIn.close();
fsIn = null; fsIn = null;
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH); for (int i = 0; i < shortCircuitCachesNum; i++) {
File dataFile = cluster.getBlockFile(0, block); ExtendedBlock block = DFSTestUtil.getAllBlocks(
File metaFile = cluster.getBlockMetadataFile(0, block); fs, TEST_PATH).get(i).getBlock();
File dataFile = cluster.getBlockFile(0, block);
File metaFile = cluster.getBlockMetadataFile(0, block);
ShortCircuitCache shortCircuitCache = ShortCircuitCache shortCircuitCache =
ClientContext.getFromConf(conf).getShortCircuitCache(); ClientContext.getFromConf(conf).getShortCircuitCache(
block.getBlockId());
test.setup(dataFile, checksum);
FileInputStream[] streams = {
new FileInputStream(dataFile),
new FileInputStream(metaFile)
};
dataIn = streams[0];
metaIn = streams[1];
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
raf = new RandomAccessFile(
new File(sockDir.getDir().getAbsolutePath(),
UUID.randomUUID().toString()), "rw");
raf.setLength(8192);
FileInputStream shmStream = new FileInputStream(raf.getFD());
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
ShortCircuitReplica replica =
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
Time.now(), shm.allocAndRegisterSlot(
ExtendedBlockId.fromExtendedBlock(block)));
blockReaderLocal = new BlockReaderLocal.Builder(
new DfsClientConf.ShortCircuitConf(conf)).
setFilename(TEST_PATH.getName()).
setBlock(block).
setShortCircuitReplica(replica).
setCachingStrategy(new CachingStrategy(false, readahead)).
setVerifyChecksum(checksum).
build();
dataIn = null;
metaIn = null;
test.doTest(blockReaderLocal, original, i * blockSize);
// BlockReaderLocal should not alter the file position.
Assert.assertEquals(0, streams[0].getChannel().position());
Assert.assertEquals(0, streams[1].getChannel().position());
}
cluster.shutdown(); cluster.shutdown();
cluster = null; cluster = null;
test.setup(dataFile, checksum);
FileInputStream streams[] = {
new FileInputStream(dataFile),
new FileInputStream(metaFile)
};
dataIn = streams[0];
metaIn = streams[1];
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
raf = new RandomAccessFile(
new File(sockDir.getDir().getAbsolutePath(),
UUID.randomUUID().toString()), "rw");
raf.setLength(8192);
FileInputStream shmStream = new FileInputStream(raf.getFD());
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
ShortCircuitReplica replica =
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
Time.now(), shm.allocAndRegisterSlot(
ExtendedBlockId.fromExtendedBlock(block)));
blockReaderLocal = new BlockReaderLocal.Builder(
new DfsClientConf.ShortCircuitConf(conf)).
setFilename(TEST_PATH.getName()).
setBlock(block).
setShortCircuitReplica(replica).
setCachingStrategy(new CachingStrategy(false, readahead)).
setVerifyChecksum(checksum).
build();
dataIn = null;
metaIn = null;
test.doTest(blockReaderLocal, original);
// BlockReaderLocal should not alter the file position.
Assert.assertEquals(0, streams[0].getChannel().position());
Assert.assertEquals(0, streams[1].getChannel().position());
} finally { } finally {
if (fsIn != null) fsIn.close(); if (fsIn != null) fsIn.close();
if (fs != null) fs.close(); if (fs != null) fs.close();
@ -227,6 +239,12 @@ public class TestBlockReaderLocal {
} }
} }
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead) throws IOException {
runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead, 1);
}
private static class TestBlockReaderLocalImmediateClose private static class TestBlockReaderLocalImmediateClose
extends BlockReaderLocalTest { extends BlockReaderLocalTest {
} }
@ -242,7 +260,7 @@ public class TestBlockReaderLocal {
@Override @Override
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 512); reader.readFully(buf, 0, 512);
assertArrayRegionsEqual(original, 0, buf, 0, 512); assertArrayRegionsEqual(original, 0, buf, 0, 512);
reader.readFully(buf, 512, 512); reader.readFully(buf, 512, 512);
@ -291,7 +309,7 @@ public class TestBlockReaderLocal {
@Override @Override
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 10); reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10);
reader.readFully(buf, 10, 100); reader.readFully(buf, 10, 100);
@ -369,7 +387,7 @@ public class TestBlockReaderLocal {
public void testBlockReaderLocalByteBufferReadsNoReadahead() public void testBlockReaderLocalByteBufferReadsNoReadahead()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
true, 0); true, 0);
} }
@Test @Test
@ -468,7 +486,7 @@ public class TestBlockReaderLocal {
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
if (usingChecksums) { if (usingChecksums) {
try { try {
reader.readFully(buf, 0, 10); reader.readFully(buf, 0, 10);
@ -508,7 +526,7 @@ public class TestBlockReaderLocal {
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
try { try {
reader.readFully(buf, 0, 10); reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10);
@ -845,4 +863,78 @@ public class TestBlockReaderLocal {
} }
} }
} }
private static class TestBlockReaderFiveShortCircutCachesReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte[] original, int shift)
throws IOException {
byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 512);
assertArrayRegionsEqual(original, shift, buf, 0, 512);
reader.readFully(buf, 512, 512);
assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512);
reader.readFully(buf, 1024, 513);
assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513);
reader.readFully(buf, 1537, 514);
assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514);
// Readahead is always at least the size of one chunk in this test.
Assert.assertTrue(reader.getMaxReadaheadLength() >=
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
}
@Test
public void testBlockReaderFiveShortCircutCachesReads() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsShortReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, 0, 5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
false, 0, 5);
}
@Test(expected = IllegalArgumentException.class)
public void testBlockReaderShortCircutCachesOutOfRangeBelow()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
0);
}
@Test(expected = IllegalArgumentException.class)
public void testBlockReaderShortCircutCachesOutOfRangeAbove()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
555);
}
} }

View File

@ -431,7 +431,7 @@ public class TestShortCircuitCache {
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
cache.getDfsClientShmManager().visit(new Visitor() { cache.getDfsClientShmManager().visit(new Visitor() {
@Override @Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@ -501,7 +501,7 @@ public class TestShortCircuitCache {
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
String TEST_FILE = "/test_file"; String TEST_FILE = "/test_file";
final int TEST_FILE_LEN = 8193; final int TEST_FILE_LEN = 8193;
final int SEED = 0xFADED; final int SEED = 0xFADED;
@ -565,7 +565,7 @@ public class TestShortCircuitCache {
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
cache.getDfsClientShmManager().visit(new Visitor() { cache.getDfsClientShmManager().visit(new Visitor() {
@Override @Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@ -877,19 +877,20 @@ public class TestShortCircuitCache {
return peerCache; return peerCache;
}); });
Mockito.when(clientContext.getShortCircuitCache()).thenAnswer( Mockito.when(clientContext.getShortCircuitCache(
blk.getBlock().getBlockId())).thenAnswer(
(Answer<ShortCircuitCache>) shortCircuitCacheCall -> { (Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
Mockito.when(cache.allocShmSlot( Mockito.when(cache.allocShmSlot(
Mockito.any(DatanodeInfo.class), Mockito.any(DatanodeInfo.class),
Mockito.any(DomainPeer.class), Mockito.any(DomainPeer.class),
Mockito.any(MutableBoolean.class), Mockito.any(MutableBoolean.class),
Mockito.any(ExtendedBlockId.class), Mockito.any(ExtendedBlockId.class),
Mockito.anyString())) Mockito.anyString()))
.thenAnswer((Answer<Slot>) call -> null); .thenAnswer((Answer<Slot>) call -> null);
return cache; return cache;
} }
); );
DatanodeInfo[] nodes = blk.getLocations(); DatanodeInfo[] nodes = blk.getLocations();