From 9fb790dcc362a70ec0896313658bac1fcead2334 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 1 Aug 2018 10:38:04 -0500 Subject: [PATCH] [ML] Fix thread leak when waiting for job flush (#32196) (#32541) --- .../autodetect/AutodetectCommunicator.java | 2 +- .../output/AutoDetectResultProcessor.java | 2 +- .../process/autodetect/output/FlushListener.java | 10 +++------- .../autodetect/AutodetectCommunicatorTests.java | 6 +++--- .../output/AutoDetectResultProcessorTests.java | 2 +- .../autodetect/output/FlushListenerTests.java | 16 ++++++++++++---- 6 files changed, 21 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 0885a8f9d64..0206bd88245 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -270,7 +270,7 @@ public class AutodetectCommunicator implements Closeable { } @Nullable - FlushAcknowledgement waitFlushToCompletion(String flushId) { + FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedException { LOGGER.debug("[{}] waiting for flush", job.getId()); FlushAcknowledgement flushAcknowledgement; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index da5e70112f0..023493c1d41 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -485,7 +485,7 @@ public class AutoDetectResultProcessor { * @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired */ @Nullable - public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) { + public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws InterruptedException { return failed ? null : flushListener.waitForFlush(flushId, timeout); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index 66b43b5e36f..1340556fbcd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -22,18 +22,14 @@ class FlushListener { final AtomicBoolean cleared = new AtomicBoolean(false); @Nullable - FlushAcknowledgement waitForFlush(String flushId, Duration timeout) { + FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException { if (cleared.get()) { return null; } FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId)); - try { - if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { - return holder.flushAcknowledgement; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + return holder.flushAcknowledgement; } return null; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index ab24aadb9dc..fda96ca29a6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -108,7 +108,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { verifyNoMoreInteractions(process); } - public void testFlushJob() throws IOException { + public void testFlushJob() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); @@ -123,7 +123,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { } } - public void testWaitForFlushReturnsIfParserFails() throws IOException { + public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); @@ -144,7 +144,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage()); } - public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException { + public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 8eb0317ba0d..af05a05cfb7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -514,7 +514,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE)); } - public void testParsingErrorSetsFailed() { + public void testParsingErrorSetsFailed() throws InterruptedException { @SuppressWarnings("unchecked") Iterator iterator = mock(Iterator.class); when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java index 70ba757f148..3bcedb52392 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java @@ -22,8 +22,12 @@ public class FlushListenerTests extends ESTestCase { FlushListener listener = new FlushListener(); AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); new Thread(() -> { - FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000)); - flushAcknowledgementHolder.set(flushAcknowledgement); + try { + FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000)); + flushAcknowledgementHolder.set(flushAcknowledgement); + } catch (InterruptedException _ex) { + Thread.currentThread().interrupt(); + } }).start(); assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id"))); assertNull(flushAcknowledgementHolder.get()); @@ -46,8 +50,12 @@ public class FlushListenerTests extends ESTestCase { AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); flushAcknowledgementHolders.add(flushAcknowledgementHolder); new Thread(() -> { - FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000)); - flushAcknowledgementHolder.set(flushAcknowledgement); + try { + FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000)); + flushAcknowledgementHolder.set(flushAcknowledgement); + } catch (InterruptedException _ex) { + Thread.currentThread().interrupt(); + } }).start(); } assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));