Revert "Don’t wait on job close (elastic/x-pack-elasticsearch#574)"

This reverts commit elastic/x-pack-elasticsearch@99ed0e0dba and fixes the failing tests

Original commit: elastic/x-pack-elasticsearch@403e38316d
This commit is contained in:
David Kyle 2017-02-17 16:53:58 +00:00
parent 3fd1c90707
commit b9a4a2c621
3 changed files with 44 additions and 22 deletions

View File

@ -35,7 +35,8 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -51,7 +52,7 @@ public class AutodetectCommunicator implements Closeable {
private final AutoDetectResultProcessor autoDetectResultProcessor;
private final Consumer<Exception> handler;
final AtomicBoolean inUse = new AtomicBoolean(false);
final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
public AutodetectCommunicator(long taskId, Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler) {
@ -83,7 +84,7 @@ public class AutodetectCommunicator implements Closeable {
DataCounts results = autoDetectWriter.write(countingStream);
autoDetectWriter.flush();
return results;
});
}, false);
}
@Override
@ -98,21 +99,22 @@ public class AutodetectCommunicator implements Closeable {
autoDetectResultProcessor.awaitCompletion();
handler.accept(errorReason != null ? new ElasticsearchException(errorReason) : null);
return null;
});
}, true);
}
public void writeUpdateModelDebugMessage(ModelDebugConfig config) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateModelDebugMessage(config);
return null;
});
}, false);
}
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, job.getId()), () -> {
autodetectProcess.writeUpdateDetectorRulesMessage(detectorIndex, rules);
return null;
});
}, false);
}
public void flushJob(InterimResultsParams params) throws IOException {
@ -120,7 +122,7 @@ public class AutodetectCommunicator implements Closeable {
String flushId = autodetectProcess.flushJob(params);
waitFlushToCompletion(flushId);
return null;
});
}, false);
}
private void waitFlushToCompletion(String flushId) throws IOException {
@ -171,16 +173,32 @@ public class AutodetectCommunicator implements Closeable {
return taskId;
}
private <T> T checkAndRun(Supplier<String> errorMessage, CheckedSupplier<T, IOException> callback) throws IOException {
if (inUse.compareAndSet(false, true)) {
private <T> T checkAndRun(Supplier<String> errorMessage, CheckedSupplier<T, IOException> callback, boolean wait) throws IOException {
CountDownLatch latch = new CountDownLatch(1);
if (inUse.compareAndSet(null, latch)) {
try {
checkProcessIsAlive();
return callback.get();
} finally {
inUse.set(false);
latch.countDown();
inUse.set(null);
}
} else {
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
if (wait) {
latch = inUse.get();
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
}
}
checkProcessIsAlive();
return callback.get();
} else {
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
}
}
}

View File

@ -28,6 +28,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
@ -155,11 +156,11 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class,
() -> communicator.writeToJob(in, mock(DataLoadParams.class)));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), Optional.empty()));
}
@ -169,11 +170,11 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
InterimResultsParams params = mock(InterimResultsParams.class);
expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.flushJob(params);
}
@ -183,10 +184,12 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true);
expectThrows(ElasticsearchStatusException.class, () -> communicator.close());
CountDownLatch latch = mock(CountDownLatch.class);
communicator.inUse.set(latch);
communicator.close();
verify(latch, times(1)).await();
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.close();
}
@ -195,10 +198,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class)));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.writeUpdateModelDebugMessage(mock(ModelDebugConfig.class));
}
@ -208,10 +211,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
communicator.inUse.set(true);
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateDetectorRulesMessage(0, rules));
communicator.inUse.set(false);
communicator.inUse.set(null);
communicator.writeUpdateDetectorRulesMessage(0, rules);
}
}

View File

@ -75,6 +75,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode());
}
@AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/592")
public void testMiniFarequoteWithDatafeeder() throws Exception {
String mappings = "{"
+ " \"mappings\": {"