HDFS-6735. A minor optimization to avoid pread() be blocked by read() inside the same DFSInputStream (Lars Hofhansl via stack)
(cherry picked from commit 7caa3bc98e
)
This commit is contained in:
parent
c962eef533
commit
fe6ee8a3f1
|
@ -162,6 +162,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7446. HDFS inotify should have the ability to determine what txid it
|
HDFS-7446. HDFS inotify should have the ability to determine what txid it
|
||||||
has read up to (cmccabe)
|
has read up to (cmccabe)
|
||||||
|
|
||||||
|
HDFS-6735. A minor optimization to avoid pread() be blocked by read()
|
||||||
|
inside the same DFSInputStream (Lars Hofhansl via stack)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -207,4 +207,13 @@
|
||||||
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
|
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
|
<!--
|
||||||
|
We use a separate lock to guard cachingStrategy in order to separate
|
||||||
|
locks for p-reads from seek + read invocations.
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.DFSInputStream" />
|
||||||
|
<Field name="cachingStrategy" />
|
||||||
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -92,17 +92,32 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
private final DFSClient dfsClient;
|
private final DFSClient dfsClient;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private final String src;
|
private final String src;
|
||||||
private BlockReader blockReader = null;
|
|
||||||
private final boolean verifyChecksum;
|
private final boolean verifyChecksum;
|
||||||
private LocatedBlocks locatedBlocks = null;
|
|
||||||
private long lastBlockBeingWrittenLength = 0;
|
// state by stateful read only:
|
||||||
private FileEncryptionInfo fileEncryptionInfo = null;
|
// (protected by lock on this)
|
||||||
|
/////
|
||||||
private DatanodeInfo currentNode = null;
|
private DatanodeInfo currentNode = null;
|
||||||
private LocatedBlock currentLocatedBlock = null;
|
private LocatedBlock currentLocatedBlock = null;
|
||||||
private long pos = 0;
|
private long pos = 0;
|
||||||
private long blockEnd = -1;
|
private long blockEnd = -1;
|
||||||
|
private BlockReader blockReader = null;
|
||||||
|
////
|
||||||
|
|
||||||
|
// state shared by stateful and positional read:
|
||||||
|
// (protected by lock on infoLock)
|
||||||
|
////
|
||||||
|
private LocatedBlocks locatedBlocks = null;
|
||||||
|
private long lastBlockBeingWrittenLength = 0;
|
||||||
|
private FileEncryptionInfo fileEncryptionInfo = null;
|
||||||
private CachingStrategy cachingStrategy;
|
private CachingStrategy cachingStrategy;
|
||||||
|
////
|
||||||
|
|
||||||
private final ReadStatistics readStatistics = new ReadStatistics();
|
private final ReadStatistics readStatistics = new ReadStatistics();
|
||||||
|
// lock for state shared between read and pread
|
||||||
|
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
|
||||||
|
// (it's OK to acquire this lock when the lock on <this> is held)
|
||||||
|
private final Object infoLock = new Object();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Track the ByteBuffers that we have handed out to readers.
|
* Track the ByteBuffers that we have handed out to readers.
|
||||||
|
@ -226,35 +241,38 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
this.dfsClient = dfsClient;
|
this.dfsClient = dfsClient;
|
||||||
this.verifyChecksum = verifyChecksum;
|
this.verifyChecksum = verifyChecksum;
|
||||||
this.src = src;
|
this.src = src;
|
||||||
this.cachingStrategy =
|
synchronized (infoLock) {
|
||||||
dfsClient.getDefaultReadCachingStrategy();
|
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
|
||||||
|
}
|
||||||
openInfo();
|
openInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Grab the open-file info from namenode
|
* Grab the open-file info from namenode
|
||||||
*/
|
*/
|
||||||
synchronized void openInfo() throws IOException, UnresolvedLinkException {
|
void openInfo() throws IOException, UnresolvedLinkException {
|
||||||
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
synchronized(infoLock) {
|
||||||
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
|
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
||||||
while (retriesForLastBlockLength > 0) {
|
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
|
||||||
// Getting last block length as -1 is a special case. When cluster
|
while (retriesForLastBlockLength > 0) {
|
||||||
// restarts, DNs may not report immediately. At this time partial block
|
// Getting last block length as -1 is a special case. When cluster
|
||||||
// locations will not be available with NN for getting the length. Lets
|
// restarts, DNs may not report immediately. At this time partial block
|
||||||
// retry for 3 times to get the length.
|
// locations will not be available with NN for getting the length. Lets
|
||||||
if (lastBlockBeingWrittenLength == -1) {
|
// retry for 3 times to get the length.
|
||||||
DFSClient.LOG.warn("Last block locations not available. "
|
if (lastBlockBeingWrittenLength == -1) {
|
||||||
+ "Datanodes might not have reported blocks completely."
|
DFSClient.LOG.warn("Last block locations not available. "
|
||||||
+ " Will retry for " + retriesForLastBlockLength + " times");
|
+ "Datanodes might not have reported blocks completely."
|
||||||
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
|
+ " Will retry for " + retriesForLastBlockLength + " times");
|
||||||
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
|
||||||
} else {
|
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
||||||
break;
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
retriesForLastBlockLength--;
|
||||||
|
}
|
||||||
|
if (retriesForLastBlockLength == 0) {
|
||||||
|
throw new IOException("Could not obtain the last block locations.");
|
||||||
}
|
}
|
||||||
retriesForLastBlockLength--;
|
|
||||||
}
|
|
||||||
if (retriesForLastBlockLength == 0) {
|
|
||||||
throw new IOException("Could not obtain the last block locations.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,7 +324,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
|
|
||||||
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
|
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
|
||||||
|
|
||||||
currentNode = null;
|
|
||||||
return lastBlockBeingWrittenLength;
|
return lastBlockBeingWrittenLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,21 +376,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
throw new IOException("Cannot obtain block length for " + locatedblock);
|
throw new IOException("Cannot obtain block length for " + locatedblock);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized long getFileLength() {
|
public long getFileLength() {
|
||||||
return locatedBlocks == null? 0:
|
synchronized(infoLock) {
|
||||||
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
|
return locatedBlocks == null? 0:
|
||||||
|
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Short circuit local reads are forbidden for files that are
|
// Short circuit local reads are forbidden for files that are
|
||||||
// under construction. See HDFS-2757.
|
// under construction. See HDFS-2757.
|
||||||
synchronized boolean shortCircuitForbidden() {
|
boolean shortCircuitForbidden() {
|
||||||
return locatedBlocks.isUnderConstruction();
|
synchronized(infoLock) {
|
||||||
|
return locatedBlocks.isUnderConstruction();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the datanode from which the stream is currently reading.
|
* Returns the datanode from which the stream is currently reading.
|
||||||
*/
|
*/
|
||||||
public DatanodeInfo getCurrentDatanode() {
|
public synchronized DatanodeInfo getCurrentDatanode() {
|
||||||
return currentNode;
|
return currentNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -403,59 +424,67 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
* @return located block
|
* @return located block
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private synchronized LocatedBlock getBlockAt(long offset,
|
private LocatedBlock getBlockAt(long offset,
|
||||||
boolean updatePosition) throws IOException {
|
boolean updatePosition) throws IOException {
|
||||||
assert (locatedBlocks != null) : "locatedBlocks is null";
|
synchronized(infoLock) {
|
||||||
|
assert (locatedBlocks != null) : "locatedBlocks is null";
|
||||||
|
|
||||||
final LocatedBlock blk;
|
final LocatedBlock blk;
|
||||||
|
|
||||||
//check offset
|
//check offset
|
||||||
if (offset < 0 || offset >= getFileLength()) {
|
if (offset < 0 || offset >= getFileLength()) {
|
||||||
throw new IOException("offset < 0 || offset >= getFileLength(), offset="
|
throw new IOException("offset < 0 || offset >= getFileLength(), offset="
|
||||||
+ offset
|
+ offset
|
||||||
+ ", updatePosition=" + updatePosition
|
+ ", updatePosition=" + updatePosition
|
||||||
+ ", locatedBlocks=" + locatedBlocks);
|
+ ", locatedBlocks=" + locatedBlocks);
|
||||||
}
|
}
|
||||||
else if (offset >= locatedBlocks.getFileLength()) {
|
else if (offset >= locatedBlocks.getFileLength()) {
|
||||||
// offset to the portion of the last block,
|
// offset to the portion of the last block,
|
||||||
// which is not known to the name-node yet;
|
// which is not known to the name-node yet;
|
||||||
// getting the last block
|
// getting the last block
|
||||||
blk = locatedBlocks.getLastLocatedBlock();
|
blk = locatedBlocks.getLastLocatedBlock();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// search cached blocks first
|
// search cached blocks first
|
||||||
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
||||||
if (targetBlockIdx < 0) { // block is not cached
|
if (targetBlockIdx < 0) { // block is not cached
|
||||||
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
||||||
// fetch more blocks
|
// fetch more blocks
|
||||||
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
||||||
assert (newBlocks != null) : "Could not find target position " + offset;
|
assert (newBlocks != null) : "Could not find target position " + offset;
|
||||||
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
||||||
|
}
|
||||||
|
blk = locatedBlocks.get(targetBlockIdx);
|
||||||
}
|
}
|
||||||
blk = locatedBlocks.get(targetBlockIdx);
|
|
||||||
}
|
|
||||||
|
|
||||||
// update current position
|
// update current position
|
||||||
if (updatePosition) {
|
if (updatePosition) {
|
||||||
pos = offset;
|
// synchronized not strictly needed, since we only get here
|
||||||
blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
|
// from synchronized caller methods
|
||||||
currentLocatedBlock = blk;
|
synchronized(this) {
|
||||||
|
pos = offset;
|
||||||
|
blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
|
||||||
|
currentLocatedBlock = blk;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return blk;
|
||||||
}
|
}
|
||||||
return blk;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Fetch a block from namenode and cache it */
|
/** Fetch a block from namenode and cache it */
|
||||||
private synchronized void fetchBlockAt(long offset) throws IOException {
|
private void fetchBlockAt(long offset) throws IOException {
|
||||||
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
synchronized(infoLock) {
|
||||||
if (targetBlockIdx < 0) { // block is not cached
|
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
||||||
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
if (targetBlockIdx < 0) { // block is not cached
|
||||||
|
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
||||||
|
}
|
||||||
|
// fetch blocks
|
||||||
|
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
||||||
|
if (newBlocks == null) {
|
||||||
|
throw new IOException("Could not find target position " + offset);
|
||||||
|
}
|
||||||
|
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
||||||
}
|
}
|
||||||
// fetch blocks
|
|
||||||
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
|
||||||
if (newBlocks == null) {
|
|
||||||
throw new IOException("Could not find target position " + offset);
|
|
||||||
}
|
|
||||||
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -467,7 +496,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
* @return consequent segment of located blocks
|
* @return consequent segment of located blocks
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private synchronized List<LocatedBlock> getBlockRange(long offset,
|
private List<LocatedBlock> getBlockRange(long offset,
|
||||||
long length) throws IOException {
|
long length) throws IOException {
|
||||||
// getFileLength(): returns total file length
|
// getFileLength(): returns total file length
|
||||||
// locatedBlocks.getFileLength(): returns length of completed blocks
|
// locatedBlocks.getFileLength(): returns length of completed blocks
|
||||||
|
@ -475,26 +504,27 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
throw new IOException("Offset: " + offset +
|
throw new IOException("Offset: " + offset +
|
||||||
" exceeds file length: " + getFileLength());
|
" exceeds file length: " + getFileLength());
|
||||||
}
|
}
|
||||||
|
synchronized(infoLock) {
|
||||||
|
final List<LocatedBlock> blocks;
|
||||||
|
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
|
||||||
|
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
|
||||||
|
final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
|
||||||
|
|
||||||
final List<LocatedBlock> blocks;
|
if (readOffsetWithinCompleteBlk) {
|
||||||
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
|
//get the blocks of finalized (completed) block range
|
||||||
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
|
blocks = getFinalizedBlockRange(offset,
|
||||||
final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
|
Math.min(length, lengthOfCompleteBlk - offset));
|
||||||
|
} else {
|
||||||
|
blocks = new ArrayList<LocatedBlock>(1);
|
||||||
|
}
|
||||||
|
|
||||||
if (readOffsetWithinCompleteBlk) {
|
// get the blocks from incomplete block range
|
||||||
//get the blocks of finalized (completed) block range
|
if (readLengthPastCompleteBlk) {
|
||||||
blocks = getFinalizedBlockRange(offset,
|
blocks.add(locatedBlocks.getLastLocatedBlock());
|
||||||
Math.min(length, lengthOfCompleteBlk - offset));
|
}
|
||||||
} else {
|
|
||||||
blocks = new ArrayList<LocatedBlock>(1);
|
return blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the blocks from incomplete block range
|
|
||||||
if (readLengthPastCompleteBlk) {
|
|
||||||
blocks.add(locatedBlocks.getLastLocatedBlock());
|
|
||||||
}
|
|
||||||
|
|
||||||
return blocks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -502,35 +532,37 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
* Includes only the complete blocks.
|
* Includes only the complete blocks.
|
||||||
* Fetch them from the namenode if not cached.
|
* Fetch them from the namenode if not cached.
|
||||||
*/
|
*/
|
||||||
private synchronized List<LocatedBlock> getFinalizedBlockRange(
|
private List<LocatedBlock> getFinalizedBlockRange(
|
||||||
long offset, long length) throws IOException {
|
long offset, long length) throws IOException {
|
||||||
assert (locatedBlocks != null) : "locatedBlocks is null";
|
synchronized(infoLock) {
|
||||||
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
|
assert (locatedBlocks != null) : "locatedBlocks is null";
|
||||||
// search cached blocks first
|
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
|
||||||
int blockIdx = locatedBlocks.findBlock(offset);
|
// search cached blocks first
|
||||||
if (blockIdx < 0) { // block is not cached
|
int blockIdx = locatedBlocks.findBlock(offset);
|
||||||
blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
|
if (blockIdx < 0) { // block is not cached
|
||||||
}
|
blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
|
||||||
long remaining = length;
|
|
||||||
long curOff = offset;
|
|
||||||
while(remaining > 0) {
|
|
||||||
LocatedBlock blk = null;
|
|
||||||
if(blockIdx < locatedBlocks.locatedBlockCount())
|
|
||||||
blk = locatedBlocks.get(blockIdx);
|
|
||||||
if (blk == null || curOff < blk.getStartOffset()) {
|
|
||||||
LocatedBlocks newBlocks;
|
|
||||||
newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
|
|
||||||
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
assert curOff >= blk.getStartOffset() : "Block not found";
|
long remaining = length;
|
||||||
blockRange.add(blk);
|
long curOff = offset;
|
||||||
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
|
while(remaining > 0) {
|
||||||
remaining -= bytesRead;
|
LocatedBlock blk = null;
|
||||||
curOff += bytesRead;
|
if(blockIdx < locatedBlocks.locatedBlockCount())
|
||||||
blockIdx++;
|
blk = locatedBlocks.get(blockIdx);
|
||||||
|
if (blk == null || curOff < blk.getStartOffset()) {
|
||||||
|
LocatedBlocks newBlocks;
|
||||||
|
newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
|
||||||
|
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
assert curOff >= blk.getStartOffset() : "Block not found";
|
||||||
|
blockRange.add(blk);
|
||||||
|
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
|
||||||
|
remaining -= bytesRead;
|
||||||
|
curOff += bytesRead;
|
||||||
|
blockIdx++;
|
||||||
|
}
|
||||||
|
return blockRange;
|
||||||
}
|
}
|
||||||
return blockRange;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -573,6 +605,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
try {
|
try {
|
||||||
ExtendedBlock blk = targetBlock.getBlock();
|
ExtendedBlock blk = targetBlock.getBlock();
|
||||||
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
|
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
|
||||||
|
CachingStrategy curCachingStrategy;
|
||||||
|
boolean shortCircuitForbidden;
|
||||||
|
synchronized(infoLock) {
|
||||||
|
curCachingStrategy = cachingStrategy;
|
||||||
|
shortCircuitForbidden = shortCircuitForbidden();
|
||||||
|
}
|
||||||
blockReader = new BlockReaderFactory(dfsClient.getConf()).
|
blockReader = new BlockReaderFactory(dfsClient.getConf()).
|
||||||
setInetSocketAddress(targetAddr).
|
setInetSocketAddress(targetAddr).
|
||||||
setRemotePeerFactory(dfsClient).
|
setRemotePeerFactory(dfsClient).
|
||||||
|
@ -585,8 +623,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
setVerifyChecksum(verifyChecksum).
|
setVerifyChecksum(verifyChecksum).
|
||||||
setClientName(dfsClient.clientName).
|
setClientName(dfsClient.clientName).
|
||||||
setLength(blk.getNumBytes() - offsetIntoBlock).
|
setLength(blk.getNumBytes() - offsetIntoBlock).
|
||||||
setCachingStrategy(cachingStrategy).
|
setCachingStrategy(curCachingStrategy).
|
||||||
setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
|
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
|
||||||
setClientCacheContext(dfsClient.getClientContext()).
|
setClientCacheContext(dfsClient.getClientContext()).
|
||||||
setUserGroupInformation(dfsClient.ugi).
|
setUserGroupInformation(dfsClient.ugi).
|
||||||
setConfiguration(dfsClient.getConfiguration()).
|
setConfiguration(dfsClient.getConfiguration()).
|
||||||
|
@ -782,7 +820,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
|
private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
|
||||||
dfsClient.checkOpen();
|
dfsClient.checkOpen();
|
||||||
if (closed) {
|
if (closed) {
|
||||||
throw new IOException("Stream closed");
|
throw new IOException("Stream closed");
|
||||||
|
@ -800,9 +838,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
currentNode = blockSeekTo(pos);
|
currentNode = blockSeekTo(pos);
|
||||||
}
|
}
|
||||||
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
||||||
if (locatedBlocks.isLastBlockComplete()) {
|
synchronized(infoLock) {
|
||||||
realLen = (int) Math.min(realLen,
|
if (locatedBlocks.isLastBlockComplete()) {
|
||||||
locatedBlocks.getFileLength() - pos);
|
realLen = (int) Math.min(realLen,
|
||||||
|
locatedBlocks.getFileLength() - pos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
|
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
|
||||||
|
|
||||||
|
@ -1055,8 +1095,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
// start of the loop.
|
// start of the loop.
|
||||||
CachingStrategy curCachingStrategy;
|
CachingStrategy curCachingStrategy;
|
||||||
boolean allowShortCircuitLocalReads;
|
boolean allowShortCircuitLocalReads;
|
||||||
synchronized (this) {
|
block = getBlockAt(block.getStartOffset(), false);
|
||||||
block = getBlockAt(block.getStartOffset(), false);
|
synchronized(infoLock) {
|
||||||
curCachingStrategy = cachingStrategy;
|
curCachingStrategy = cachingStrategy;
|
||||||
allowShortCircuitLocalReads = !shortCircuitForbidden();
|
allowShortCircuitLocalReads = !shortCircuitForbidden();
|
||||||
}
|
}
|
||||||
|
@ -1488,7 +1528,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
* Same as {@link #seekToNewSource(long)} except that it does not exclude
|
* Same as {@link #seekToNewSource(long)} except that it does not exclude
|
||||||
* the current datanode and might connect to the same node.
|
* the current datanode and might connect to the same node.
|
||||||
*/
|
*/
|
||||||
private synchronized boolean seekToBlockSource(long targetPos)
|
private boolean seekToBlockSource(long targetPos)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
currentNode = blockSeekTo(targetPos);
|
currentNode = blockSeekTo(targetPos);
|
||||||
return true;
|
return true;
|
||||||
|
@ -1575,11 +1615,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
return new ReadStatistics(readStatistics);
|
return new ReadStatistics(readStatistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized FileEncryptionInfo getFileEncryptionInfo() {
|
public FileEncryptionInfo getFileEncryptionInfo() {
|
||||||
return fileEncryptionInfo;
|
synchronized(infoLock) {
|
||||||
|
return fileEncryptionInfo;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void closeCurrentBlockReader() {
|
private void closeCurrentBlockReader() {
|
||||||
if (blockReader == null) return;
|
if (blockReader == null) return;
|
||||||
// Close the current block reader so that the new caching settings can
|
// Close the current block reader so that the new caching settings can
|
||||||
// take effect immediately.
|
// take effect immediately.
|
||||||
|
@ -1594,18 +1636,20 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setReadahead(Long readahead)
|
public synchronized void setReadahead(Long readahead)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.cachingStrategy =
|
synchronized (infoLock) {
|
||||||
new CachingStrategy.Builder(this.cachingStrategy).
|
this.cachingStrategy =
|
||||||
setReadahead(readahead).build();
|
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
|
||||||
|
}
|
||||||
closeCurrentBlockReader();
|
closeCurrentBlockReader();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setDropBehind(Boolean dropBehind)
|
public synchronized void setDropBehind(Boolean dropBehind)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.cachingStrategy =
|
synchronized (infoLock) {
|
||||||
new CachingStrategy.Builder(this.cachingStrategy).
|
this.cachingStrategy =
|
||||||
setDropBehind(dropBehind).build();
|
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
|
||||||
|
}
|
||||||
closeCurrentBlockReader();
|
closeCurrentBlockReader();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,14 +34,17 @@ public class LocatedBlocks {
|
||||||
private final long fileLength;
|
private final long fileLength;
|
||||||
private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
|
private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
|
||||||
private final boolean underConstruction;
|
private final boolean underConstruction;
|
||||||
private LocatedBlock lastLocatedBlock = null;
|
private final LocatedBlock lastLocatedBlock;
|
||||||
private boolean isLastBlockComplete = false;
|
private final boolean isLastBlockComplete;
|
||||||
private FileEncryptionInfo fileEncryptionInfo = null;
|
private final FileEncryptionInfo fileEncryptionInfo;
|
||||||
|
|
||||||
public LocatedBlocks() {
|
public LocatedBlocks() {
|
||||||
fileLength = 0;
|
fileLength = 0;
|
||||||
blocks = null;
|
blocks = null;
|
||||||
underConstruction = false;
|
underConstruction = false;
|
||||||
|
lastLocatedBlock = null;
|
||||||
|
isLastBlockComplete = false;
|
||||||
|
fileEncryptionInfo = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public LocatedBlocks(long flength, boolean isUnderConstuction,
|
public LocatedBlocks(long flength, boolean isUnderConstuction,
|
||||||
|
|
|
@ -400,7 +400,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
Preconditions.checkArgument(replica.refCount > 0,
|
Preconditions.checkArgument(replica.refCount > 0,
|
||||||
"can't ref " + replica + " because its refCount reached " +
|
"can't ref %s because its refCount reached %d", replica,
|
||||||
replica.refCount);
|
replica.refCount);
|
||||||
Long evictableTimeNs = replica.getEvictableTimeNs();
|
Long evictableTimeNs = replica.getEvictableTimeNs();
|
||||||
replica.refCount++;
|
replica.refCount++;
|
||||||
|
@ -456,14 +456,13 @@ public class ShortCircuitCache implements Closeable {
|
||||||
if (newRefCount == 0) {
|
if (newRefCount == 0) {
|
||||||
// Close replica, since there are no remaining references to it.
|
// Close replica, since there are no remaining references to it.
|
||||||
Preconditions.checkArgument(replica.purged,
|
Preconditions.checkArgument(replica.purged,
|
||||||
"Replica " + replica + " reached a refCount of 0 without " +
|
"Replica %s reached a refCount of 0 without being purged", replica);
|
||||||
"being purged");
|
|
||||||
replica.close();
|
replica.close();
|
||||||
} else if (newRefCount == 1) {
|
} else if (newRefCount == 1) {
|
||||||
Preconditions.checkState(null == replica.getEvictableTimeNs(),
|
Preconditions.checkState(null == replica.getEvictableTimeNs(),
|
||||||
"Replica " + replica + " had a refCount higher than 1, " +
|
"Replica %s had a refCount higher than 1, " +
|
||||||
"but was still evictable (evictableTimeNs = " +
|
"but was still evictable (evictableTimeNs = %d)",
|
||||||
replica.getEvictableTimeNs() + ")");
|
replica, replica.getEvictableTimeNs());
|
||||||
if (!replica.purged) {
|
if (!replica.purged) {
|
||||||
// Add the replica to the end of an eviction list.
|
// Add the replica to the end of an eviction list.
|
||||||
// Eviction lists are sorted by time.
|
// Eviction lists are sorted by time.
|
||||||
|
@ -478,8 +477,8 @@ public class ShortCircuitCache implements Closeable {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Preconditions.checkArgument(replica.refCount >= 0,
|
Preconditions.checkArgument(replica.refCount >= 0,
|
||||||
"replica's refCount went negative (refCount = " +
|
"replica's refCount went negative (refCount = %d" +
|
||||||
replica.refCount + " for " + replica + ")");
|
" for %s)", replica.refCount, replica);
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(this + ": unref replica " + replica +
|
LOG.trace(this + ": unref replica " + replica +
|
||||||
|
@ -602,7 +601,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
Preconditions.checkNotNull(evictableTimeNs);
|
Preconditions.checkNotNull(evictableTimeNs);
|
||||||
ShortCircuitReplica removed = map.remove(evictableTimeNs);
|
ShortCircuitReplica removed = map.remove(evictableTimeNs);
|
||||||
Preconditions.checkState(removed == replica,
|
Preconditions.checkState(removed == replica,
|
||||||
"failed to make " + replica + " unevictable");
|
"failed to make %s unevictable", replica);
|
||||||
replica.setEvictableTimeNs(null);
|
replica.setEvictableTimeNs(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -859,7 +858,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
Condition cond = (Condition)replica.mmapData;
|
Condition cond = (Condition)replica.mmapData;
|
||||||
cond.awaitUninterruptibly();
|
cond.awaitUninterruptibly();
|
||||||
} else {
|
} else {
|
||||||
Preconditions.checkState(false, "invalid mmapData type " +
|
Preconditions.checkState(false, "invalid mmapData type %s",
|
||||||
replica.mmapData.getClass().getName());
|
replica.mmapData.getClass().getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -243,18 +243,22 @@ public class ShortCircuitReplica {
|
||||||
String suffix = "";
|
String suffix = "";
|
||||||
|
|
||||||
Preconditions.checkState(refCount == 0,
|
Preconditions.checkState(refCount == 0,
|
||||||
"tried to close replica with refCount " + refCount + ": " + this);
|
"tried to close replica with refCount %d: %s", refCount, this);
|
||||||
refCount = -1;
|
refCount = -1;
|
||||||
Preconditions.checkState(purged,
|
Preconditions.checkState(purged,
|
||||||
"tried to close unpurged replica " + this);
|
"tried to close unpurged replica %s", this);
|
||||||
if (hasMmap()) {
|
if (hasMmap()) {
|
||||||
munmap();
|
munmap();
|
||||||
suffix += " munmapped.";
|
if (LOG.isTraceEnabled()) {
|
||||||
|
suffix += " munmapped.";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
IOUtils.cleanup(LOG, dataStream, metaStream);
|
IOUtils.cleanup(LOG, dataStream, metaStream);
|
||||||
if (slot != null) {
|
if (slot != null) {
|
||||||
cache.scheduleSlotReleaser(slot);
|
cache.scheduleSlotReleaser(slot);
|
||||||
suffix += " scheduling " + slot + " for later release.";
|
if (LOG.isTraceEnabled()) {
|
||||||
|
suffix += " scheduling " + slot + " for later release.";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("closed " + this + suffix);
|
LOG.trace("closed " + this + suffix);
|
||||||
|
|
Loading…
Reference in New Issue