diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 8f12484a55c..254df42584c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -94,6 +94,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private long fCursorAfterLastRead = -1; private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 + // of valid bytes in buffer) private boolean closed = false; private TracingContext tracingContext; @@ -745,6 +746,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, return buffer; } + public boolean isClosed() { return closed; } + @VisibleForTesting public boolean isReadAheadEnabled() { return readAheadEnabled; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 317aaf545a1..970dbb180ec 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -247,7 +247,7 @@ final class ReadBufferManager { // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + if (buf.getStream().isClosed() || (buf.isFirstByteConsumed() && buf.isLastByteConsumed())) { nodeToEvict = buf; break; } @@ -544,7 +544,6 @@ final class ReadBufferManager { LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); purgeList(stream, completedReadList); - purgeList(stream, inProgressList); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index a30f06261ef..96f084531bc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -62,6 +62,11 @@ class ReadBufferWorker implements Runnable { } if (buffer != null) { try { + // Stop network call if stream is closed + if (postFailureWhenStreamClosed(bufferManager, buffer)) { + continue; + } + // do the actual read, from the file. int bytesRead = buffer.getStream().readRemote( buffer.getOffset(), @@ -73,7 +78,11 @@ class ReadBufferWorker implements Runnable { Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), buffer.getTracingContext()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager + // Update failure to completed list if stream is closed + if (!postFailureWhenStreamClosed(bufferManager, buffer)) { + bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, + bytesRead); // post result back to ReadBufferManager + } } catch (IOException ex) { buffer.setErrException(ex); bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); @@ -84,4 +93,16 @@ class ReadBufferWorker implements Runnable { } } } + + private boolean postFailureWhenStreamClosed(ReadBufferManager bufferManager, + ReadBuffer buffer) { + + // When stream is closed report failure to be picked by eviction + if (buffer.getStream().isClosed()) { + // Fail read + bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); + return true; + } + return false; + } }