diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 90ac992eb96..e51b118598d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -95,6 +95,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.ClientHelper.clientWithOrigin; import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin; @@ -515,7 +516,7 @@ public class JobProvider { * @return a bucket {@link BatchedResultsIterator} */ public BatchedResultsIterator newBatchedBucketsIterator(String jobId) { - return new BatchedBucketsIterator(client, jobId); + return new BatchedBucketsIterator(clientWithOrigin(client, ML_ORIGIN), jobId); } /** @@ -527,7 +528,7 @@ public class JobProvider { * @return a record {@link BatchedResultsIterator} */ public BatchedResultsIterator newBatchedRecordsIterator(String jobId) { - return new BatchedRecordsIterator(client, jobId); + return new BatchedRecordsIterator(clientWithOrigin(client, ML_ORIGIN), jobId); } /** @@ -702,7 +703,7 @@ public class JobProvider { * @return an influencer {@link BatchedResultsIterator} */ public BatchedResultsIterator newBatchedInfluencersIterator(String jobId) { - return new BatchedInfluencersIterator(client, jobId); + return new BatchedInfluencersIterator(clientWithOrigin(client, ML_ORIGIN), jobId); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 8f2ebfe9b24..e4867d80a06 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -383,7 +383,7 @@ public class AutodetectProcessManager extends AbstractComponent { client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), autodetectParams.modelSnapshot() != null); ExecutorService autodetectWorkerExecutor; - try { + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); autoDetectExecutorService.submit(() -> processor.process(process)); } catch (EsRejectedExecutionException e) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index daac4fdc703..0c24be1310b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -537,6 +537,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testCreate_notEnoughThreads() throws IOException { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ExecutorService executorService = mock(ExecutorService.class); doThrow(new EsRejectedExecutionException("")).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(anyString())).thenReturn(executorService); @@ -610,6 +611,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createNonSpyManager(String jobId) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(anyString())).thenReturn(executorService); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class)); @@ -639,6 +641,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) { ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService()); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client,