Draft change

This commit is contained in:
Sneha Vijayarajan 2022-11-14 05:32:19 -08:00
parent a48e8c9beb
commit e46985a188
3 changed files with 26 additions and 3 deletions

View File

@ -94,6 +94,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private long fCursorAfterLastRead = -1; private long fCursorAfterLastRead = -1;
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
// of valid bytes in buffer) // of valid bytes in buffer)
private boolean closed = false; private boolean closed = false;
private TracingContext tracingContext; private TracingContext tracingContext;
@ -745,6 +746,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
return buffer; return buffer;
} }
public boolean isClosed() { return closed; }
@VisibleForTesting @VisibleForTesting
public boolean isReadAheadEnabled() { public boolean isReadAheadEnabled() {
return readAheadEnabled; return readAheadEnabled;

View File

@ -247,7 +247,7 @@ final class ReadBufferManager {
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : completedReadList) { for (ReadBuffer buf : completedReadList) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { if (buf.getStream().isClosed() || (buf.isFirstByteConsumed() && buf.isLastByteConsumed())) {
nodeToEvict = buf; nodeToEvict = buf;
break; break;
} }
@ -544,7 +544,6 @@ final class ReadBufferManager {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList); purgeList(stream, completedReadList);
purgeList(stream, inProgressList);
} }
/** /**

View File

@ -62,6 +62,11 @@ class ReadBufferWorker implements Runnable {
} }
if (buffer != null) { if (buffer != null) {
try { try {
// Stop network call if stream is closed
if (postFailureWhenStreamClosed(bufferManager, buffer)) {
continue;
}
// do the actual read, from the file. // do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote( int bytesRead = buffer.getStream().readRemote(
buffer.getOffset(), buffer.getOffset(),
@ -73,7 +78,11 @@ class ReadBufferWorker implements Runnable {
Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), Math.min(buffer.getRequestedLength(), buffer.getBuffer().length),
buffer.getTracingContext()); buffer.getTracingContext());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager // Update failure to completed list if stream is closed
if (!postFailureWhenStreamClosed(bufferManager, buffer)) {
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE,
bytesRead); // post result back to ReadBufferManager
}
} catch (IOException ex) { } catch (IOException ex) {
buffer.setErrException(ex); buffer.setErrException(ex);
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
@ -84,4 +93,16 @@ class ReadBufferWorker implements Runnable {
} }
} }
} }
private boolean postFailureWhenStreamClosed(ReadBufferManager bufferManager,
ReadBuffer buffer) {
// When stream is closed report failure to be picked by eviction
if (buffer.getStream().isClosed()) {
// Fail read
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
return true;
}
return false;
}
} }