[ML] Use volatile boolean to stop blackhole process

Original commit: elastic/x-pack-elasticsearch@3788c7f0cf
This commit is contained in:
Martijn van Groningen 2017-03-30 14:47:06 +02:00
parent 5b2351fad0
commit f6d8f92d40
1 changed files with 13 additions and 8 deletions

View File

@ -16,8 +16,9 @@ import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* A placeholder class simulating the actions of the native Autodetect process.
@ -29,11 +30,10 @@ import java.util.concurrent.BlockingQueue;
public class BlackHoleAutodetectProcess implements AutodetectProcess {
private static final String FLUSH_ID = "flush-1";
private static final AutodetectResult EMPTY = new AutodetectResult(null, null, null, null, null, null, null, null, null);
private final ZonedDateTime startTime;
private final BlockingQueue<AutodetectResult> results = new ArrayBlockingQueue<>(128);
private final BlockingQueue<AutodetectResult> results = new LinkedBlockingDeque<>();
private volatile boolean open = true;
public BlackHoleAutodetectProcess() {
startTime = ZonedDateTime.now();
@ -74,12 +74,12 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
@Override
public void close() throws IOException {
results.add(EMPTY);
open = false;
}
@Override
public Iterator<AutodetectResult> readAutodetectResults() {
// Create a custom iterator here, because ArrayBlockingQueue iterator and stream are not blocking when empty:
// Create a custom iterator here, because LinkedBlockingDeque iterator and stream are not blocking when empty:
return new Iterator<AutodetectResult>() {
AutodetectResult result;
@ -87,8 +87,13 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
@Override
public boolean hasNext() {
try {
result = results.take();
return result != EMPTY;
while (open) {
result = results.poll(100, TimeUnit.MILLISECONDS);
if (result != null) {
break;
}
}
return open;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;