From 220d0647b827bf64c40d2ab65705ead05de46159 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 28 Nov 2017 09:48:49 +0000 Subject: [PATCH] [ML] Specify ML_ORIGIN when batch scrolling results (elastic/x-pack-elasticsearch#3125) This change applies the same pattern that was applied in elastic/x-pack-elasticsearch#3054 to the ML batched results iterators, which are used to scroll through ML results during some internal ML implementation details, such as renormalization and nightly maintenance. Additionally the thread context is reset before submitting the results processor to a thread pool, to avoid masking the problem in situations where the user opening the job coincidentally had workable permissions. Fixes elastic/machine-learning-cpp#438 Original commit: elastic/x-pack-elasticsearch@bd1e2dc7d4741bc61e2e9aeadad44cf9764f9976 --- .../xpack/ml/job/persistence/JobProvider.java | 7 ++++--- .../job/process/autodetect/AutodetectProcessManager.java | 2 +- .../process/autodetect/AutodetectProcessManagerTests.java | 3 +++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 90ac992eb96..e51b118598d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -95,6 +95,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.ClientHelper.clientWithOrigin; import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin; @@ -515,7 +516,7 @@ public class JobProvider { * @return a bucket {@link BatchedResultsIterator} */ public BatchedResultsIterator newBatchedBucketsIterator(String jobId) { - return new BatchedBucketsIterator(client, jobId); + return new BatchedBucketsIterator(clientWithOrigin(client, ML_ORIGIN), jobId); } /** @@ -527,7 +528,7 @@ public class JobProvider { * @return a record {@link BatchedResultsIterator} */ public BatchedResultsIterator newBatchedRecordsIterator(String jobId) { - return new BatchedRecordsIterator(client, jobId); + return new BatchedRecordsIterator(clientWithOrigin(client, ML_ORIGIN), jobId); } /** @@ -702,7 +703,7 @@ public class JobProvider { * @return an influencer {@link BatchedResultsIterator} */ public BatchedResultsIterator newBatchedInfluencersIterator(String jobId) { - return new BatchedInfluencersIterator(client, jobId); + return new BatchedInfluencersIterator(clientWithOrigin(client, ML_ORIGIN), jobId); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 8f2ebfe9b24..e4867d80a06 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -383,7 +383,7 @@ public class AutodetectProcessManager extends AbstractComponent { client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), autodetectParams.modelSnapshot() != null); ExecutorService autodetectWorkerExecutor; - try { + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); autoDetectExecutorService.submit(() -> processor.process(process)); } catch (EsRejectedExecutionException e) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index daac4fdc703..0c24be1310b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -537,6 +537,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testCreate_notEnoughThreads() throws IOException { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ExecutorService executorService = mock(ExecutorService.class); doThrow(new EsRejectedExecutionException("")).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(anyString())).thenReturn(executorService); @@ -610,6 +611,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createNonSpyManager(String jobId) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(anyString())).thenReturn(executorService); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class)); @@ -639,6 +641,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) { ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService()); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client,