Wait upto 30 seconds for flush to complete Instead of waiting 5 times 6 seconds.

Original commit: elastic/x-pack-elasticsearch@4766e1e903
This commit is contained in:
Martijn van Groningen 2016-11-25 07:24:59 +01:00
parent 2ef240c20b
commit 8b1b035965
2 changed files with 10 additions and 32 deletions

View File

@ -39,9 +39,7 @@ import java.util.function.Supplier;
public class AutodetectCommunicator implements Closeable {
private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class);
private static final int DEFAULT_TRY_COUNT = 5;
private static final int DEFAULT_TRY_TIMEOUT_SECS = 6;
private static final int DEFAULT_TRY_TIMEOUT_SECS = 30;
private final String jobId;
private final StatusReporter statusReporter;
@ -103,30 +101,16 @@ public class AutodetectCommunicator implements Closeable {
}
public void flushJob(InterimResultsParams params) throws IOException {
flushJob(params, DEFAULT_TRY_COUNT, DEFAULT_TRY_TIMEOUT_SECS);
flushJob(params, DEFAULT_TRY_TIMEOUT_SECS);
}
void flushJob(InterimResultsParams params, int tryCount, int tryTimeoutSecs) throws IOException {
checkAndRun(false, () -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, jobId), () -> {
int tryCountCounter = tryCount;
void flushJob(InterimResultsParams params, int tryTimeoutSecs) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, jobId), () -> {
String flushId = autodetectProcess.flushJob(params);
// TODO: norelease: I think waiting once 30 seconds will have the same effect as 5 * 6 seconds.
// So we may want to remove this retry logic here
Duration intermittentTimeout = Duration.ofSeconds(tryTimeoutSecs);
boolean isFlushComplete = false;
while (isFlushComplete == false && --tryCountCounter >= 0) {
// Check there wasn't an error in the flush
if (!autodetectProcess.isProcessAlive()) {
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_UNEXPTECTED_DEATH, jobId) +
" " + autodetectProcess.readError();
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg);
}
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, intermittentTimeout);
LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete);
}
Duration timeout = Duration.ofSeconds(tryTimeoutSecs);
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, timeout);
LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete);
if (!isFlushComplete) {
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, jobId) + " " + autodetectProcess.readError();
LOGGER.error(msg);
@ -165,15 +149,9 @@ public class AutodetectCommunicator implements Closeable {
}
private <T> T checkAndRun(Supplier<String> errorMessage, Callback<T> callback) throws IOException {
return checkAndRun(true, errorMessage, callback);
}
private <T> T checkAndRun(boolean checkIsAlive, Supplier<String> errorMessage, Callback<T> callback) throws IOException {
if (inUse.compareAndSet(false, true)) {
try {
if (checkIsAlive) {
checkProcessIsAlive();
}
checkProcessIsAlive();
return callback.run();
} finally {
inUse.set(false);

View File

@ -72,7 +72,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
InterimResultsParams params = InterimResultsParams.builder().build();
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params));
assertEquals("[foo] Flush failed: Unexpected death of the Autodetect process flushing job. Mock process is dead", e.getMessage());
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", e.getMessage());
}
public void testFlushJob_throwsOnTimeout() throws IOException {
@ -83,7 +83,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(false);
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
InterimResultsParams params = InterimResultsParams.builder().build();
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params, 1, 1));
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params, 1));
assertEquals("[foo] Timed out flushing job. Mock process has stalled", e.getMessage());
}
}