Ignore rejected execution exception

This commit is contained in:
Martijn van Groningen 2016-01-07 14:52:15 +01:00
parent ba97b7f7bd
commit fa9aab91eb
1 changed files with 29 additions and 20 deletions

View File

@ -42,6 +42,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.RejectedExecutionException;
/**
* Instantiates and wires all the services that the ingest plugin will be needing.
@ -197,30 +198,38 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
}
void startPipelineStore(MetaData metaData) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
// Before we start the pipeline store we check if the index template exists,
// if it doesn't we add it. If for some reason this fails we will try again later,
// but the pipeline store won't start before that happened
if (isIngestTemplateInstallationRequired(metaData)) {
installIngestIndexTemplate();
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
// Before we start the pipeline store we check if the index template exists,
// if it doesn't we add it. If for some reason this fails we will try again later,
// but the pipeline store won't start before that happened
if (isIngestTemplateInstallationRequired(metaData)) {
installIngestIndexTemplate();
}
pipelineStore.start();
} catch (Exception e) {
logger.warn("pipeline store failed to start, retrying...", e);
startPipelineStore(metaData);
}
pipelineStore.start();
} catch (Exception e) {
logger.warn("pipeline store failed to start, retrying...", e);
startPipelineStore(metaData);
}
});
});
} catch (RejectedExecutionException e) {
logger.debug("async pipeline store start failed", e);
}
}
void stopPipelineStore(String reason) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
pipelineStore.stop(reason);
} catch (Exception e) {
logger.error("pipeline store stop failure", e);
}
});
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
pipelineStore.stop(reason);
} catch (Exception e) {
logger.error("pipeline store stop failure", e);
}
});
} catch (RejectedExecutionException e) {
logger.debug("async pipeline store stop failed", e);
}
}
}