diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index 9a8f2c08984..49f278dccde 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -45,6 +45,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.HashSet; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import static java.util.Collections.singletonList; @@ -152,6 +153,14 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { ((CountingBlobContainer) blobContainer).totalBytes.sum(), equalTo((long) input.length) ); + // busy assert that closing of all streams happened because they are closed on background fetcher threads + assertBusy( + () -> assertEquals( + "All open streams should have been closed", + 0, + ((CountingBlobContainer) blobContainer).openStreams.get() + ) + ); } } } finally { @@ -269,6 +278,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { private final LongAdder totalBytes = new LongAdder(); private final LongAdder totalOpens = new LongAdder(); + private final AtomicInteger openStreams = new AtomicInteger(0); + private final int rangeSize; CountingBlobContainer(BlobContainer in, int rangeSize) { @@ -301,7 +312,6 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { private final CountingBlobContainer container; - private long bytesRead = 0L; private long position = 0L; private long start = Long.MAX_VALUE; private long end = Long.MIN_VALUE; @@ -310,6 +320,7 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { super(input); this.container = Objects.requireNonNull(container); this.container.totalOpens.increment(); + this.container.openStreams.incrementAndGet(); } @Override @@ -322,7 +333,7 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { if (result == -1) { return result; } - bytesRead += 1L; + this.container.totalBytes.increment(); position += 1L; if (position > end) { @@ -338,7 +349,7 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { } final int result = in.read(b, offset, len); - bytesRead += len; + this.container.totalBytes.add(len); position += len; if (position > end) { @@ -349,8 +360,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { @Override public void close() throws IOException { - in.close(); - this.container.totalBytes.add(bytesRead); + super.close(); + this.container.openStreams.decrementAndGet(); } } }