Closing the input stream happens on a separate thread now that the `CacheFile` is implemented in a lock-free fashion. Closes #64215
This commit is contained in:
parent
a2b18e9ab9
commit
fada4a1c78
|
@ -45,6 +45,7 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
|
@ -152,6 +153,14 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
||||||
((CountingBlobContainer) blobContainer).totalBytes.sum(),
|
((CountingBlobContainer) blobContainer).totalBytes.sum(),
|
||||||
equalTo((long) input.length)
|
equalTo((long) input.length)
|
||||||
);
|
);
|
||||||
|
// busy assert that closing of all streams happened because they are closed on background fetcher threads
|
||||||
|
assertBusy(
|
||||||
|
() -> assertEquals(
|
||||||
|
"All open streams should have been closed",
|
||||||
|
0,
|
||||||
|
((CountingBlobContainer) blobContainer).openStreams.get()
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -269,6 +278,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
||||||
private final LongAdder totalBytes = new LongAdder();
|
private final LongAdder totalBytes = new LongAdder();
|
||||||
private final LongAdder totalOpens = new LongAdder();
|
private final LongAdder totalOpens = new LongAdder();
|
||||||
|
|
||||||
|
private final AtomicInteger openStreams = new AtomicInteger(0);
|
||||||
|
|
||||||
private final int rangeSize;
|
private final int rangeSize;
|
||||||
|
|
||||||
CountingBlobContainer(BlobContainer in, int rangeSize) {
|
CountingBlobContainer(BlobContainer in, int rangeSize) {
|
||||||
|
@ -301,7 +312,6 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
||||||
|
|
||||||
private final CountingBlobContainer container;
|
private final CountingBlobContainer container;
|
||||||
|
|
||||||
private long bytesRead = 0L;
|
|
||||||
private long position = 0L;
|
private long position = 0L;
|
||||||
private long start = Long.MAX_VALUE;
|
private long start = Long.MAX_VALUE;
|
||||||
private long end = Long.MIN_VALUE;
|
private long end = Long.MIN_VALUE;
|
||||||
|
@ -310,6 +320,7 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
||||||
super(input);
|
super(input);
|
||||||
this.container = Objects.requireNonNull(container);
|
this.container = Objects.requireNonNull(container);
|
||||||
this.container.totalOpens.increment();
|
this.container.totalOpens.increment();
|
||||||
|
this.container.openStreams.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -322,7 +333,7 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
||||||
if (result == -1) {
|
if (result == -1) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
bytesRead += 1L;
|
this.container.totalBytes.increment();
|
||||||
position += 1L;
|
position += 1L;
|
||||||
|
|
||||||
if (position > end) {
|
if (position > end) {
|
||||||
|
@ -338,7 +349,7 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
final int result = in.read(b, offset, len);
|
final int result = in.read(b, offset, len);
|
||||||
bytesRead += len;
|
this.container.totalBytes.add(len);
|
||||||
position += len;
|
position += len;
|
||||||
|
|
||||||
if (position > end) {
|
if (position > end) {
|
||||||
|
@ -349,8 +360,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
in.close();
|
super.close();
|
||||||
this.container.totalBytes.add(bytesRead);
|
this.container.openStreams.decrementAndGet();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue