From f6d8f92d401c0ad0ac81ccd0ecdf165188fff8df Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 30 Mar 2017 14:47:06 +0200 Subject: [PATCH] [ML] Use volatile boolean to stop blackhole process Original commit: elastic/x-pack-elasticsearch@3788c7f0cf19ac11377142b4db76cafc6a00bec3 --- .../BlackHoleAutodetectProcess.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 5001bba25b5..446f44795f7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -16,8 +16,9 @@ import java.io.IOException; import java.time.ZonedDateTime; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; /** * A placeholder class simulating the actions of the native Autodetect process. @@ -29,11 +30,10 @@ import java.util.concurrent.BlockingQueue; public class BlackHoleAutodetectProcess implements AutodetectProcess { private static final String FLUSH_ID = "flush-1"; - private static final AutodetectResult EMPTY = new AutodetectResult(null, null, null, null, null, null, null, null, null); private final ZonedDateTime startTime; - - private final BlockingQueue results = new ArrayBlockingQueue<>(128); + private final BlockingQueue results = new LinkedBlockingDeque<>(); + private volatile boolean open = true; public BlackHoleAutodetectProcess() { startTime = ZonedDateTime.now(); @@ -74,12 +74,12 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { @Override public void close() throws IOException { - results.add(EMPTY); + open = false; } @Override public Iterator readAutodetectResults() { - // Create a custom iterator here, because ArrayBlockingQueue iterator and stream are not blocking when empty: + // Create a custom iterator here, because LinkedBlockingDeque iterator and stream are not blocking when empty: return new Iterator() { AutodetectResult result; @@ -87,8 +87,13 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { @Override public boolean hasNext() { try { - result = results.take(); - return result != EMPTY; + while (open) { + result = results.poll(100, TimeUnit.MILLISECONDS); + if (result != null) { + break; + } + } + return open; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false;