[ML] Specify ML_ORIGIN when batch scrolling results (elastic/x-pack-elasticsearch#3125)
This change applies the same pattern that was applied in elastic/x-pack-elasticsearch#3054 to the ML batched results iterators, which are used to scroll through ML results during some internal ML implementation details, such as renormalization and nightly maintenance. Additionally the thread context is reset before submitting the results processor to a thread pool, to avoid masking the problem in situations where the user opening the job coincidentally had workable permissions. Fixes elastic/machine-learning-cpp#438 Original commit: elastic/x-pack-elasticsearch@bd1e2dc7d4
This commit is contained in:
parent
9f59ef6697
commit
220d0647b8
|
@ -95,6 +95,7 @@ import java.util.function.Consumer;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
|
import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN;
|
||||||
|
import static org.elasticsearch.xpack.ClientHelper.clientWithOrigin;
|
||||||
import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
|
import static org.elasticsearch.xpack.ClientHelper.executeAsyncWithOrigin;
|
||||||
import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin;
|
import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin;
|
||||||
|
|
||||||
|
@ -515,7 +516,7 @@ public class JobProvider {
|
||||||
* @return a bucket {@link BatchedResultsIterator}
|
* @return a bucket {@link BatchedResultsIterator}
|
||||||
*/
|
*/
|
||||||
public BatchedResultsIterator<Bucket> newBatchedBucketsIterator(String jobId) {
|
public BatchedResultsIterator<Bucket> newBatchedBucketsIterator(String jobId) {
|
||||||
return new BatchedBucketsIterator(client, jobId);
|
return new BatchedBucketsIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -527,7 +528,7 @@ public class JobProvider {
|
||||||
* @return a record {@link BatchedResultsIterator}
|
* @return a record {@link BatchedResultsIterator}
|
||||||
*/
|
*/
|
||||||
public BatchedResultsIterator<AnomalyRecord> newBatchedRecordsIterator(String jobId) {
|
public BatchedResultsIterator<AnomalyRecord> newBatchedRecordsIterator(String jobId) {
|
||||||
return new BatchedRecordsIterator(client, jobId);
|
return new BatchedRecordsIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -702,7 +703,7 @@ public class JobProvider {
|
||||||
* @return an influencer {@link BatchedResultsIterator}
|
* @return an influencer {@link BatchedResultsIterator}
|
||||||
*/
|
*/
|
||||||
public BatchedResultsIterator<Influencer> newBatchedInfluencersIterator(String jobId) {
|
public BatchedResultsIterator<Influencer> newBatchedInfluencersIterator(String jobId) {
|
||||||
return new BatchedInfluencersIterator(client, jobId);
|
return new BatchedInfluencersIterator(clientWithOrigin(client, ML_ORIGIN), jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -383,7 +383,7 @@ public class AutodetectProcessManager extends AbstractComponent {
|
||||||
client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
|
client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
|
||||||
autodetectParams.modelSnapshot() != null);
|
autodetectParams.modelSnapshot() != null);
|
||||||
ExecutorService autodetectWorkerExecutor;
|
ExecutorService autodetectWorkerExecutor;
|
||||||
try {
|
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
|
||||||
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
|
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
|
||||||
autoDetectExecutorService.submit(() -> processor.process(process));
|
autoDetectExecutorService.submit(() -> processor.process(process));
|
||||||
} catch (EsRejectedExecutionException e) {
|
} catch (EsRejectedExecutionException e) {
|
||||||
|
|
|
@ -537,6 +537,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||||
public void testCreate_notEnoughThreads() throws IOException {
|
public void testCreate_notEnoughThreads() throws IOException {
|
||||||
Client client = mock(Client.class);
|
Client client = mock(Client.class);
|
||||||
ThreadPool threadPool = mock(ThreadPool.class);
|
ThreadPool threadPool = mock(ThreadPool.class);
|
||||||
|
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||||
ExecutorService executorService = mock(ExecutorService.class);
|
ExecutorService executorService = mock(ExecutorService.class);
|
||||||
doThrow(new EsRejectedExecutionException("")).when(executorService).submit(any(Runnable.class));
|
doThrow(new EsRejectedExecutionException("")).when(executorService).submit(any(Runnable.class));
|
||||||
when(threadPool.executor(anyString())).thenReturn(executorService);
|
when(threadPool.executor(anyString())).thenReturn(executorService);
|
||||||
|
@ -610,6 +611,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||||
private AutodetectProcessManager createNonSpyManager(String jobId) {
|
private AutodetectProcessManager createNonSpyManager(String jobId) {
|
||||||
Client client = mock(Client.class);
|
Client client = mock(Client.class);
|
||||||
ThreadPool threadPool = mock(ThreadPool.class);
|
ThreadPool threadPool = mock(ThreadPool.class);
|
||||||
|
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||||
ExecutorService executorService = mock(ExecutorService.class);
|
ExecutorService executorService = mock(ExecutorService.class);
|
||||||
when(threadPool.executor(anyString())).thenReturn(executorService);
|
when(threadPool.executor(anyString())).thenReturn(executorService);
|
||||||
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class));
|
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class));
|
||||||
|
@ -639,6 +641,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||||
|
|
||||||
private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) {
|
private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client) {
|
||||||
ThreadPool threadPool = mock(ThreadPool.class);
|
ThreadPool threadPool = mock(ThreadPool.class);
|
||||||
|
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||||
when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
|
when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
|
||||||
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
|
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
|
||||||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client,
|
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client,
|
||||||
|
|
Loading…
Reference in New Issue