HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)
(cherry picked from commit 2abcf7762a
)
This commit is contained in:
parent
289f8acc64
commit
240cba7e6d
|
@ -77,7 +77,7 @@ public class ClientContext {
|
||||||
/**
|
/**
|
||||||
* Caches short-circuit file descriptors, mmap regions.
|
* Caches short-circuit file descriptors, mmap regions.
|
||||||
*/
|
*/
|
||||||
private final ShortCircuitCache shortCircuitCache;
|
private final ShortCircuitCache[] shortCircuitCache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Caches TCP and UNIX domain sockets for reuse.
|
* Caches TCP and UNIX domain sockets for reuse.
|
||||||
|
@ -132,13 +132,23 @@ public class ClientContext {
|
||||||
*/
|
*/
|
||||||
private DeadNodeDetector deadNodeDetector = null;
|
private DeadNodeDetector deadNodeDetector = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ShortCircuitCache array size.
|
||||||
|
*/
|
||||||
|
private final int clientShortCircuitNum;
|
||||||
|
|
||||||
private ClientContext(String name, DfsClientConf conf,
|
private ClientContext(String name, DfsClientConf conf,
|
||||||
Configuration config) {
|
Configuration config) {
|
||||||
final ShortCircuitConf scConf = conf.getShortCircuitConf();
|
final ShortCircuitConf scConf = conf.getShortCircuitConf();
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.confString = scConf.confAsString();
|
this.confString = scConf.confAsString();
|
||||||
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
|
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
|
||||||
|
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
|
||||||
|
for (int i = 0; i < this.clientShortCircuitNum; i++) {
|
||||||
|
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
|
||||||
|
}
|
||||||
|
|
||||||
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
|
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
|
||||||
scConf.getSocketCacheExpiry());
|
scConf.getSocketCacheExpiry());
|
||||||
this.keyProviderCache = new KeyProviderCache(
|
this.keyProviderCache = new KeyProviderCache(
|
||||||
|
@ -228,7 +238,11 @@ public class ClientContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShortCircuitCache getShortCircuitCache() {
|
public ShortCircuitCache getShortCircuitCache() {
|
||||||
return shortCircuitCache;
|
return shortCircuitCache[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
public ShortCircuitCache getShortCircuitCache(long idx) {
|
||||||
|
return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
|
||||||
}
|
}
|
||||||
|
|
||||||
public PeerCache getPeerCache() {
|
public PeerCache getPeerCache() {
|
||||||
|
|
|
@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys {
|
||||||
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
|
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
|
||||||
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
|
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
|
||||||
60000;
|
60000;
|
||||||
|
String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
|
||||||
|
int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
|
||||||
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
|
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
|
||||||
"dfs.client.slow.io.warning.threshold.ms";
|
"dfs.client.slow.io.warning.threshold.ms";
|
||||||
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
|
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
|
||||||
|
|
|
@ -476,7 +476,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
"giving up on BlockReaderLocal.", this, pathInfo);
|
"giving up on BlockReaderLocal.", this, pathInfo);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
ShortCircuitCache cache = clientContext.getShortCircuitCache();
|
ShortCircuitCache cache =
|
||||||
|
clientContext.getShortCircuitCache(block.getBlockId());
|
||||||
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
|
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
|
||||||
block.getBlockPoolId());
|
block.getBlockPoolId());
|
||||||
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
|
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
|
||||||
|
@ -527,7 +528,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
if (curPeer.fromCache) remainingCacheTries--;
|
if (curPeer.fromCache) remainingCacheTries--;
|
||||||
DomainPeer peer = (DomainPeer)curPeer.peer;
|
DomainPeer peer = (DomainPeer)curPeer.peer;
|
||||||
Slot slot = null;
|
Slot slot = null;
|
||||||
ShortCircuitCache cache = clientContext.getShortCircuitCache();
|
ShortCircuitCache cache =
|
||||||
|
clientContext.getShortCircuitCache(block.getBlockId());
|
||||||
try {
|
try {
|
||||||
MutableBoolean usedPeer = new MutableBoolean(false);
|
MutableBoolean usedPeer = new MutableBoolean(false);
|
||||||
slot = cache.allocShmSlot(datanode, peer, usedPeer,
|
slot = cache.allocShmSlot(datanode, peer, usedPeer,
|
||||||
|
@ -582,7 +584,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
*/
|
*/
|
||||||
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
|
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
|
||||||
Slot slot) throws IOException {
|
Slot slot) throws IOException {
|
||||||
ShortCircuitCache cache = clientContext.getShortCircuitCache();
|
ShortCircuitCache cache =
|
||||||
|
clientContext.getShortCircuitCache(block.getBlockId());
|
||||||
final DataOutputStream out =
|
final DataOutputStream out =
|
||||||
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
|
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
|
||||||
SlotId slotId = slot == null ? null : slot.getSlotId();
|
SlotId slotId = slot == null ? null : slot.getSlotId();
|
||||||
|
|
|
@ -142,6 +142,7 @@ public class DfsClientConf {
|
||||||
private final long refreshReadBlockLocationsMS;
|
private final long refreshReadBlockLocationsMS;
|
||||||
|
|
||||||
private final ShortCircuitConf shortCircuitConf;
|
private final ShortCircuitConf shortCircuitConf;
|
||||||
|
private final int clientShortCircuitNum;
|
||||||
|
|
||||||
private final long hedgedReadThresholdMillis;
|
private final long hedgedReadThresholdMillis;
|
||||||
private final int hedgedReadThreadpoolSize;
|
private final int hedgedReadThreadpoolSize;
|
||||||
|
@ -272,8 +273,6 @@ public class DfsClientConf {
|
||||||
HdfsClientConfigKeys.
|
HdfsClientConfigKeys.
|
||||||
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
|
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
|
||||||
|
|
||||||
shortCircuitConf = new ShortCircuitConf(conf);
|
|
||||||
|
|
||||||
hedgedReadThresholdMillis = conf.getLong(
|
hedgedReadThresholdMillis = conf.getLong(
|
||||||
HedgedRead.THRESHOLD_MILLIS_KEY,
|
HedgedRead.THRESHOLD_MILLIS_KEY,
|
||||||
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
|
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
|
||||||
|
@ -296,6 +295,17 @@ public class DfsClientConf {
|
||||||
leaseHardLimitPeriod =
|
leaseHardLimitPeriod =
|
||||||
conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
|
conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
|
||||||
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
|
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
|
||||||
|
|
||||||
|
shortCircuitConf = new ShortCircuitConf(conf);
|
||||||
|
clientShortCircuitNum = conf.getInt(
|
||||||
|
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
|
||||||
|
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
|
||||||
|
Preconditions.checkArgument(clientShortCircuitNum >= 1,
|
||||||
|
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
|
||||||
|
"can't be less then 1.");
|
||||||
|
Preconditions.checkArgument(clientShortCircuitNum <= 5,
|
||||||
|
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
|
||||||
|
"can't be more then 5.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -601,6 +611,13 @@ public class DfsClientConf {
|
||||||
return slowIoWarningThresholdMs;
|
return slowIoWarningThresholdMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @return the clientShortCircuitNum
|
||||||
|
*/
|
||||||
|
public int getClientShortCircuitNum() {
|
||||||
|
return clientShortCircuitNum;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the hedgedReadThresholdMillis
|
* @return the hedgedReadThresholdMillis
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -4188,6 +4188,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.short.circuit.num</name>
|
||||||
|
<value>1</value>
|
||||||
|
<description>
|
||||||
|
Number of short-circuit caches. This setting should
|
||||||
|
be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
|
||||||
|
values may speed up massive parallel reading files.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.client.read.striped.threadpool.size</name>
|
<name>dfs.client.read.striped.threadpool.size</name>
|
||||||
<value>18</value>
|
<value>18</value>
|
||||||
|
|
|
@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
|
||||||
fsIn.close();
|
fsIn.close();
|
||||||
fsIn = fs.open(TEST_PATH);
|
fsIn = fs.open(TEST_PATH);
|
||||||
final ShortCircuitCache cache = ClientContext.get(
|
final ShortCircuitCache cache = ClientContext.get(
|
||||||
CONTEXT, conf).getShortCircuitCache();
|
CONTEXT, conf).getShortCircuitCache(0);
|
||||||
cache.accept(new CountingVisitor(0, 5, 5, 0));
|
cache.accept(new CountingVisitor(0, 5, 5, 0));
|
||||||
results[0] = fsIn.read(null, BLOCK_SIZE,
|
results[0] = fsIn.read(null, BLOCK_SIZE,
|
||||||
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
|
@ -659,7 +659,7 @@ public class TestEnhancedByteBufferAccess {
|
||||||
final ExtendedBlock firstBlock =
|
final ExtendedBlock firstBlock =
|
||||||
DFSTestUtil.getFirstBlock(fs, TEST_PATH);
|
DFSTestUtil.getFirstBlock(fs, TEST_PATH);
|
||||||
final ShortCircuitCache cache = ClientContext.get(
|
final ShortCircuitCache cache = ClientContext.get(
|
||||||
CONTEXT, conf).getShortCircuitCache();
|
CONTEXT, conf).getShortCircuitCache(0);
|
||||||
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
|
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
|
||||||
// Uncache the replica
|
// Uncache the replica
|
||||||
fs.removeCacheDirective(directiveId);
|
fs.removeCacheDirective(directiveId);
|
||||||
|
|
|
@ -389,7 +389,7 @@ public class TestBlockReaderFactory {
|
||||||
|
|
||||||
try (FSDataInputStream in = dfs.open(testFile)) {
|
try (FSDataInputStream in = dfs.open(testFile)) {
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
dfs.getClient().getClientContext().getShortCircuitCache()
|
dfs.getClient().getClientContext().getShortCircuitCache(0)
|
||||||
.getReplicaInfoMapSize());
|
.getReplicaInfoMapSize());
|
||||||
|
|
||||||
final byte[] buf = new byte[testFileLen];
|
final byte[] buf = new byte[testFileLen];
|
||||||
|
@ -398,12 +398,12 @@ public class TestBlockReaderFactory {
|
||||||
|
|
||||||
// Set cache size to 0 so the replica marked evictable by unbuffer
|
// Set cache size to 0 so the replica marked evictable by unbuffer
|
||||||
// will be purged immediately.
|
// will be purged immediately.
|
||||||
dfs.getClient().getClientContext().getShortCircuitCache()
|
dfs.getClient().getClientContext().getShortCircuitCache(0)
|
||||||
.setMaxTotalSize(0);
|
.setMaxTotalSize(0);
|
||||||
LOG.info("Unbuffering");
|
LOG.info("Unbuffering");
|
||||||
in.unbuffer();
|
in.unbuffer();
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
dfs.getClient().getClientContext().getShortCircuitCache()
|
dfs.getClient().getClientContext().getShortCircuitCache(0)
|
||||||
.getReplicaInfoMapSize());
|
.getReplicaInfoMapSize());
|
||||||
|
|
||||||
DFSTestUtil.appendFile(dfs, testFile, "append more data");
|
DFSTestUtil.appendFile(dfs, testFile, "append more data");
|
||||||
|
@ -432,7 +432,7 @@ public class TestBlockReaderFactory {
|
||||||
final int expectedScrRepMapSize) {
|
final int expectedScrRepMapSize) {
|
||||||
Assert.assertThat(expected, CoreMatchers.is(actual));
|
Assert.assertThat(expected, CoreMatchers.is(actual));
|
||||||
Assert.assertEquals(expectedScrRepMapSize,
|
Assert.assertEquals(expectedScrRepMapSize,
|
||||||
dfs.getClient().getClientContext().getShortCircuitCache()
|
dfs.getClient().getClientContext().getShortCircuitCache(0)
|
||||||
.getReplicaInfoMapSize());
|
.getReplicaInfoMapSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -467,7 +467,7 @@ public class TestBlockReaderFactory {
|
||||||
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
||||||
Assert.assertTrue(Arrays.equals(contents, expected));
|
Assert.assertTrue(Arrays.equals(contents, expected));
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.getClient().getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache(0);
|
||||||
final DatanodeInfo datanode = new DatanodeInfoBuilder()
|
final DatanodeInfo datanode = new DatanodeInfoBuilder()
|
||||||
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
|
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
|
||||||
.build();
|
.build();
|
||||||
|
@ -516,7 +516,7 @@ public class TestBlockReaderFactory {
|
||||||
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
||||||
Assert.assertTrue(Arrays.equals(contents, expected));
|
Assert.assertTrue(Arrays.equals(contents, expected));
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.getClient().getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache(0);
|
||||||
Assert.assertEquals(null, cache.getDfsClientShmManager());
|
Assert.assertEquals(null, cache.getDfsClientShmManager());
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
|
@ -548,7 +548,7 @@ public class TestBlockReaderFactory {
|
||||||
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
||||||
Assert.assertTrue(Arrays.equals(contents, expected));
|
Assert.assertTrue(Arrays.equals(contents, expected));
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.getClient().getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache(0);
|
||||||
cache.close();
|
cache.close();
|
||||||
Assert.assertTrue(cache.getDfsClientShmManager().
|
Assert.assertTrue(cache.getDfsClientShmManager().
|
||||||
getDomainSocketWatcher().isClosed());
|
getDomainSocketWatcher().isClosed());
|
||||||
|
|
|
@ -116,7 +116,7 @@ public class TestBlockReaderLocal {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class BlockReaderLocalTest {
|
private static class BlockReaderLocalTest {
|
||||||
final static int TEST_LENGTH = 12345;
|
final static int TEST_LENGTH = 1234567;
|
||||||
final static int BYTES_PER_CHECKSUM = 512;
|
final static int BYTES_PER_CHECKSUM = 512;
|
||||||
|
|
||||||
public void setConfiguration(HdfsConfiguration conf) {
|
public void setConfiguration(HdfsConfiguration conf) {
|
||||||
|
@ -130,10 +130,14 @@ public class TestBlockReaderLocal {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// default: no-op
|
// default: no-op
|
||||||
}
|
}
|
||||||
}
|
public void doTest(BlockReaderLocal reader, byte[] original, int shift)
|
||||||
|
throws IOException {
|
||||||
|
// default: no-op
|
||||||
|
} }
|
||||||
|
|
||||||
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
||||||
boolean checksum, long readahead) throws IOException {
|
boolean checksum, long readahead, int shortCircuitCachesNum)
|
||||||
|
throws IOException {
|
||||||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
@ -143,10 +147,13 @@ public class TestBlockReaderLocal {
|
||||||
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
||||||
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
|
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
|
||||||
|
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
|
||||||
|
shortCircuitCachesNum);
|
||||||
test.setConfiguration(conf);
|
test.setConfiguration(conf);
|
||||||
FileInputStream dataIn = null, metaIn = null;
|
FileInputStream dataIn = null, metaIn = null;
|
||||||
final Path TEST_PATH = new Path("/a");
|
final Path TEST_PATH = new Path("/a");
|
||||||
final long RANDOM_SEED = 4567L;
|
final long RANDOM_SEED = 4567L;
|
||||||
|
final int blockSize = 10 * 1024;
|
||||||
BlockReaderLocal blockReaderLocal = null;
|
BlockReaderLocal blockReaderLocal = null;
|
||||||
FSDataInputStream fsIn = null;
|
FSDataInputStream fsIn = null;
|
||||||
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
|
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
|
||||||
|
@ -158,8 +165,8 @@ public class TestBlockReaderLocal {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
DFSTestUtil.createFile(fs, TEST_PATH,
|
DFSTestUtil.createFile(fs, TEST_PATH, 1024,
|
||||||
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
|
BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED);
|
||||||
try {
|
try {
|
||||||
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -174,16 +181,17 @@ public class TestBlockReaderLocal {
|
||||||
BlockReaderLocalTest.TEST_LENGTH);
|
BlockReaderLocalTest.TEST_LENGTH);
|
||||||
fsIn.close();
|
fsIn.close();
|
||||||
fsIn = null;
|
fsIn = null;
|
||||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
|
for (int i = 0; i < shortCircuitCachesNum; i++) {
|
||||||
|
ExtendedBlock block = DFSTestUtil.getAllBlocks(
|
||||||
|
fs, TEST_PATH).get(i).getBlock();
|
||||||
File dataFile = cluster.getBlockFile(0, block);
|
File dataFile = cluster.getBlockFile(0, block);
|
||||||
File metaFile = cluster.getBlockMetadataFile(0, block);
|
File metaFile = cluster.getBlockMetadataFile(0, block);
|
||||||
|
|
||||||
ShortCircuitCache shortCircuitCache =
|
ShortCircuitCache shortCircuitCache =
|
||||||
ClientContext.getFromConf(conf).getShortCircuitCache();
|
ClientContext.getFromConf(conf).getShortCircuitCache(
|
||||||
cluster.shutdown();
|
block.getBlockId());
|
||||||
cluster = null;
|
|
||||||
test.setup(dataFile, checksum);
|
test.setup(dataFile, checksum);
|
||||||
FileInputStream streams[] = {
|
FileInputStream[] streams = {
|
||||||
new FileInputStream(dataFile),
|
new FileInputStream(dataFile),
|
||||||
new FileInputStream(metaFile)
|
new FileInputStream(metaFile)
|
||||||
};
|
};
|
||||||
|
@ -211,10 +219,14 @@ public class TestBlockReaderLocal {
|
||||||
build();
|
build();
|
||||||
dataIn = null;
|
dataIn = null;
|
||||||
metaIn = null;
|
metaIn = null;
|
||||||
test.doTest(blockReaderLocal, original);
|
test.doTest(blockReaderLocal, original, i * blockSize);
|
||||||
// BlockReaderLocal should not alter the file position.
|
// BlockReaderLocal should not alter the file position.
|
||||||
Assert.assertEquals(0, streams[0].getChannel().position());
|
Assert.assertEquals(0, streams[0].getChannel().position());
|
||||||
Assert.assertEquals(0, streams[1].getChannel().position());
|
Assert.assertEquals(0, streams[1].getChannel().position());
|
||||||
|
}
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
if (fsIn != null) fsIn.close();
|
if (fsIn != null) fsIn.close();
|
||||||
if (fs != null) fs.close();
|
if (fs != null) fs.close();
|
||||||
|
@ -227,6 +239,11 @@ public class TestBlockReaderLocal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
||||||
|
boolean checksum, long readahead) throws IOException {
|
||||||
|
runBlockReaderLocalTest(test, checksum, readahead, 1);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalImmediateClose
|
private static class TestBlockReaderLocalImmediateClose
|
||||||
extends BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
}
|
}
|
||||||
|
@ -242,7 +259,7 @@ public class TestBlockReaderLocal {
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte[] buf = new byte[TEST_LENGTH];
|
||||||
reader.readFully(buf, 0, 512);
|
reader.readFully(buf, 0, 512);
|
||||||
assertArrayRegionsEqual(original, 0, buf, 0, 512);
|
assertArrayRegionsEqual(original, 0, buf, 0, 512);
|
||||||
reader.readFully(buf, 512, 512);
|
reader.readFully(buf, 512, 512);
|
||||||
|
@ -291,7 +308,7 @@ public class TestBlockReaderLocal {
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte[] buf = new byte[TEST_LENGTH];
|
||||||
reader.readFully(buf, 0, 10);
|
reader.readFully(buf, 0, 10);
|
||||||
assertArrayRegionsEqual(original, 0, buf, 0, 10);
|
assertArrayRegionsEqual(original, 0, buf, 0, 10);
|
||||||
reader.readFully(buf, 10, 100);
|
reader.readFully(buf, 10, 100);
|
||||||
|
@ -468,7 +485,7 @@ public class TestBlockReaderLocal {
|
||||||
|
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte[] buf = new byte[TEST_LENGTH];
|
||||||
if (usingChecksums) {
|
if (usingChecksums) {
|
||||||
try {
|
try {
|
||||||
reader.readFully(buf, 0, 10);
|
reader.readFully(buf, 0, 10);
|
||||||
|
@ -508,7 +525,7 @@ public class TestBlockReaderLocal {
|
||||||
|
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte[] buf = new byte[TEST_LENGTH];
|
||||||
try {
|
try {
|
||||||
reader.readFully(buf, 0, 10);
|
reader.readFully(buf, 0, 10);
|
||||||
assertArrayRegionsEqual(original, 0, buf, 0, 10);
|
assertArrayRegionsEqual(original, 0, buf, 0, 10);
|
||||||
|
@ -845,4 +862,78 @@ public class TestBlockReaderLocal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class TestBlockReaderFiveShortCircutCachesReads
|
||||||
|
extends BlockReaderLocalTest {
|
||||||
|
@Override
|
||||||
|
public void doTest(BlockReaderLocal reader, byte[] original, int shift)
|
||||||
|
throws IOException {
|
||||||
|
byte[] buf = new byte[TEST_LENGTH];
|
||||||
|
reader.readFully(buf, 0, 512);
|
||||||
|
assertArrayRegionsEqual(original, shift, buf, 0, 512);
|
||||||
|
reader.readFully(buf, 512, 512);
|
||||||
|
assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512);
|
||||||
|
reader.readFully(buf, 1024, 513);
|
||||||
|
assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513);
|
||||||
|
reader.readFully(buf, 1537, 514);
|
||||||
|
assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514);
|
||||||
|
// Readahead is always at least the size of one chunk in this test.
|
||||||
|
Assert.assertTrue(reader.getMaxReadaheadLength() >=
|
||||||
|
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderFiveShortCircutCachesReads() throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
|
||||||
|
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
|
||||||
|
5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderFiveShortCircutCachesReadsShortReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
|
||||||
|
true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1,
|
||||||
|
5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderFiveShortCircutCachesReadsNoChecksum()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
|
||||||
|
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
|
||||||
|
5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderFiveShortCircutCachesReadsNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
|
||||||
|
true, 0, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
|
||||||
|
false, 0, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testBlockReaderShortCircutCachesOutOfRangeBelow()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
|
||||||
|
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
|
||||||
|
0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testBlockReaderShortCircutCachesOutOfRangeAbove()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
|
||||||
|
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
|
||||||
|
555);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -431,7 +431,7 @@ public class TestShortCircuitCache {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.getClient().getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache(0);
|
||||||
cache.getDfsClientShmManager().visit(new Visitor() {
|
cache.getDfsClientShmManager().visit(new Visitor() {
|
||||||
@Override
|
@Override
|
||||||
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
||||||
|
@ -501,7 +501,7 @@ public class TestShortCircuitCache {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.getClient().getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache(0);
|
||||||
String TEST_FILE = "/test_file";
|
String TEST_FILE = "/test_file";
|
||||||
final int TEST_FILE_LEN = 8193;
|
final int TEST_FILE_LEN = 8193;
|
||||||
final int SEED = 0xFADED;
|
final int SEED = 0xFADED;
|
||||||
|
@ -565,7 +565,7 @@ public class TestShortCircuitCache {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.getClient().getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache(0);
|
||||||
cache.getDfsClientShmManager().visit(new Visitor() {
|
cache.getDfsClientShmManager().visit(new Visitor() {
|
||||||
@Override
|
@Override
|
||||||
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
||||||
|
@ -877,7 +877,8 @@ public class TestShortCircuitCache {
|
||||||
return peerCache;
|
return peerCache;
|
||||||
});
|
});
|
||||||
|
|
||||||
Mockito.when(clientContext.getShortCircuitCache()).thenAnswer(
|
Mockito.when(clientContext.getShortCircuitCache(
|
||||||
|
blk.getBlock().getBlockId())).thenAnswer(
|
||||||
(Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
|
(Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
|
||||||
ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
|
ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
|
||||||
Mockito.when(cache.allocShmSlot(
|
Mockito.when(cache.allocShmSlot(
|
||||||
|
|
Loading…
Reference in New Issue