Let close autodetect wait for other operations to complete.

Original commit: elastic/x-pack-elasticsearch@de517f4fba
This commit is contained in:
Martijn van Groningen 2016-11-29 15:03:02 +01:00
parent f88216eaa5
commit fb2bd73bc1
3 changed files with 47 additions and 33 deletions

View File

@ -32,8 +32,9 @@ import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier; import java.util.function.Supplier;
public class AutodetectCommunicator implements Closeable { public class AutodetectCommunicator implements Closeable {
@ -47,7 +48,7 @@ public class AutodetectCommunicator implements Closeable {
private final DataToProcessWriter autoDetectWriter; private final DataToProcessWriter autoDetectWriter;
private final AutoDetectResultProcessor autoDetectResultProcessor; private final AutoDetectResultProcessor autoDetectResultProcessor;
final AtomicBoolean inUse = new AtomicBoolean(false); final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter, public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter,
AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) { AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) {
@ -81,7 +82,7 @@ public class AutodetectCommunicator implements Closeable {
DataCounts results = autoDetectWriter.write(countingStream); DataCounts results = autoDetectWriter.write(countingStream);
autoDetectWriter.flush(); autoDetectWriter.flush();
return results; return results;
}); }, false);
} }
@Override @Override
@ -91,14 +92,14 @@ public class AutodetectCommunicator implements Closeable {
autodetectProcess.close(); autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion(); autoDetectResultProcessor.awaitCompletion();
return null; return null;
}); }, true);
} }
public void writeUpdateConfigMessage(String config) throws IOException { public void writeUpdateConfigMessage(String config) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, jobId), () -> { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, jobId), () -> {
autodetectProcess.writeUpdateConfigMessage(config); autodetectProcess.writeUpdateConfigMessage(config);
return null; return null;
}); }, false);
} }
public void flushJob(InterimResultsParams params) throws IOException { public void flushJob(InterimResultsParams params) throws IOException {
@ -110,6 +111,7 @@ public class AutodetectCommunicator implements Closeable {
String flushId = autodetectProcess.flushJob(params); String flushId = autodetectProcess.flushJob(params);
Duration timeout = Duration.ofSeconds(tryTimeoutSecs); Duration timeout = Duration.ofSeconds(tryTimeoutSecs);
LOGGER.info("[{}] waiting for flush", jobId);
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout); boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout);
LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete); LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete);
if (!isFlushComplete) { if (!isFlushComplete) {
@ -122,7 +124,7 @@ public class AutodetectCommunicator implements Closeable {
// clients from querying results in the middle of normalisation. // clients from querying results in the middle of normalisation.
autoDetectResultProcessor.waitUntilRenormaliserIsIdle(); autoDetectResultProcessor.waitUntilRenormaliserIsIdle();
return null; return null;
}); }, false);
} }
/** /**
@ -149,18 +151,34 @@ public class AutodetectCommunicator implements Closeable {
return Optional.ofNullable(statusReporter.runningTotalStats()); return Optional.ofNullable(statusReporter.runningTotalStats());
} }
private <T> T checkAndRun(Supplier<String> errorMessage, Callback<T> callback) throws IOException { private <T> T checkAndRun(Supplier<String> errorMessage, Callback<T> callback, boolean wait) throws IOException {
if (inUse.compareAndSet(false, true)) { CountDownLatch latch = new CountDownLatch(1);
if (inUse.compareAndSet(null, latch)) {
try { try {
checkProcessIsAlive(); checkProcessIsAlive();
return callback.run(); return callback.run();
} finally { } finally {
inUse.set(false); latch.countDown();
inUse.set(null);
} }
} else { } else {
if (wait) {
latch = inUse.get();
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS); throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
} }
} }
checkProcessIsAlive();
return callback.run();
} else {
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
}
}
}
private interface Callback<T> { private interface Callback<T> {

View File

@ -72,7 +72,6 @@ public class ScheduledJobIT extends ESRestTestCase {
waitForSchedulerStoppedState(client(), jobId); waitForSchedulerStoppedState(client(), jobId);
} }
@AwaitsFix(bugUrl = "mvg fix this")
public void testStartJobScheduler_GivenRealtime() throws Exception { public void testStartJobScheduler_GivenRealtime() throws Exception {
String jobId = "_id3"; String jobId = "_id3";
createAirlineDataIndex(); createAirlineDataIndex();
@ -86,8 +85,10 @@ public class ScheduledJobIT extends ESRestTestCase {
assertBusy(() -> { assertBusy(() -> {
try { try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId, Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "status")); Collections.singletonMap("metric", "status,data_counts"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\"")); String responseAsString = responseEntityToString(getJobResponse);
assertThat(responseAsString, containsString("\"status\":\"RUNNING\""));
assertThat(responseAsString, containsString("\"input_record_count\":2"));
} catch (Exception e1) { } catch (Exception e1) {
throw new RuntimeException(e1); throw new RuntimeException(e1);
} }
@ -105,16 +106,6 @@ public class ScheduledJobIT extends ESRestTestCase {
waitForSchedulerStoppedState(client(), jobId); waitForSchedulerStoppedState(client(), jobId);
client().performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close"); client().performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close");
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"CLOSED\""));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId); response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));

View File

@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doAnswer;
@ -31,6 +32,8 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class AutodetectCommunicatorTests extends ESTestCase { public class AutodetectCommunicatorTests extends ESTestCase {
@ -139,10 +142,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class)); AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
communicator.inUse.set(true); communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeToJob(in, mock(DataLoadParams.class))); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeToJob(in, mock(DataLoadParams.class)));
communicator.inUse.set(false); communicator.inUse.set(null);
communicator.writeToJob(in, mock(DataLoadParams.class)); communicator.writeToJob(in, mock(DataLoadParams.class));
} }
@ -152,24 +155,26 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true); communicator.inUse.set(new CountDownLatch(1));
InterimResultsParams params = mock(InterimResultsParams.class); InterimResultsParams params = mock(InterimResultsParams.class);
expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params)); expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params));
communicator.inUse.set(false); communicator.inUse.set(null);
communicator.flushJob(params); communicator.flushJob(params);
} }
public void testCloseInUse() throws IOException { public void testCloseInUse() throws Exception {
AutodetectProcess process = mockAutodetectProcessWithOutputStream(); AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class); AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true); CountDownLatch latch = mock(CountDownLatch.class);
expectThrows(ElasticsearchStatusException.class, communicator::close); communicator.inUse.set(latch);
communicator.close();
verify(latch, times(1)).await();
communicator.inUse.set(false); communicator.inUse.set(null);
communicator.close(); communicator.close();
} }
@ -179,10 +184,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true); when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor); AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true); communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateConfigMessage("")); expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateConfigMessage(""));
communicator.inUse.set(false); communicator.inUse.set(null);
communicator.writeUpdateConfigMessage(""); communicator.writeUpdateConfigMessage("");
} }