diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java index dbd0ead64cb..e58615f9f30 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java @@ -8,39 +8,77 @@ package org.elasticsearch.xpack.core.common.notifications; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import java.io.IOException; import java.util.Date; import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public abstract class AbstractAuditor { private static final Logger logger = LogManager.getLogger(AbstractAuditor.class); - private final Client client; + static final int MAX_BUFFER_SIZE = 1000; + + private final OriginSettingClient client; private final String nodeName; private final String auditIndex; - private final String executionOrigin; + private final String templateName; + private final Supplier templateSupplier; private final AbstractAuditMessageFactory messageFactory; + private final AtomicBoolean hasLatestTemplate; - protected AbstractAuditor(Client client, - String nodeName, + private Queue backlog; + private final ClusterService clusterService; + private final AtomicBoolean putTemplateInProgress; + + + protected AbstractAuditor(OriginSettingClient client, String auditIndex, - String executionOrigin, - AbstractAuditMessageFactory messageFactory) { + IndexTemplateConfig templateConfig, + String nodeName, + AbstractAuditMessageFactory messageFactory, + ClusterService clusterService) { + + this(client, auditIndex, templateConfig.getTemplateName(), + () -> new PutIndexTemplateRequest(templateConfig.getTemplateName()).source(templateConfig.loadBytes(), XContentType.JSON), + nodeName, messageFactory, clusterService); + } + + + protected AbstractAuditor(OriginSettingClient client, + String auditIndex, + String templateName, + Supplier templateSupplier, + String nodeName, + AbstractAuditMessageFactory messageFactory, + ClusterService clusterService) { this.client = Objects.requireNonNull(client); - this.nodeName = Objects.requireNonNull(nodeName); - this.auditIndex = auditIndex; - this.executionOrigin = executionOrigin; + this.auditIndex = Objects.requireNonNull(auditIndex); + this.templateName = Objects.requireNonNull(templateName); + this.templateSupplier = Objects.requireNonNull(templateSupplier); this.messageFactory = Objects.requireNonNull(messageFactory); + this.clusterService = Objects.requireNonNull(clusterService); + this.nodeName = Objects.requireNonNull(nodeName); + this.backlog = new ConcurrentLinkedQueue<>(); + this.hasLatestTemplate = new AtomicBoolean(); + this.putTemplateInProgress = new AtomicBoolean(); } public void info(String resourceId, String message) { @@ -64,16 +102,74 @@ public abstract class AbstractAuditor { } private void indexDoc(ToXContent toXContent) { + if (hasLatestTemplate.get()) { + writeDoc(toXContent); + return; + } + + if (MlIndexAndAlias.hasIndexTemplate(clusterService.state(), templateName)) { + synchronized (this) { + // synchronized so nothing can be added to backlog while this value changes + hasLatestTemplate.set(true); + } + writeDoc(toXContent); + return; + } + + ActionListener putTemplateListener = ActionListener.wrap( + r -> { + synchronized (this) { + // synchronized so nothing can be added to backlog while this value changes + hasLatestTemplate.set(true); + } + logger.info("Auditor template [{}] successfully installed", templateName); + writeBacklog(); + putTemplateInProgress.set(false); + }, + e -> { + logger.warn("Error putting latest template [{}]", templateName); + putTemplateInProgress.set(false); + } + ); + + synchronized (this) { + if (hasLatestTemplate.get() == false) { + // synchronized so that hasLatestTemplate does not change value + // between the read and adding to the backlog + assert backlog != null; + if (backlog != null) { + if (backlog.size() >= MAX_BUFFER_SIZE) { + backlog.remove(); + } + backlog.add(toXContent); + } else { + logger.error("Latest audit template missing but the back log has been written"); + } + + // stop multiple invocations + if (putTemplateInProgress.compareAndSet(false, true)) { + MlIndexAndAlias.installIndexTemplateIfRequired(clusterService.state(), client, templateSupplier.get(), + putTemplateListener); + } + return; + } + } + + indexDoc(toXContent); + } + + private void writeDoc(ToXContent toXContent) { + client.index(indexRequest(toXContent), ActionListener.wrap( + this::onIndexResponse, + this::onIndexFailure + )); + } + + private IndexRequest indexRequest(ToXContent toXContent) { IndexRequest indexRequest = new IndexRequest(auditIndex); indexRequest.source(toXContentBuilder(toXContent)); indexRequest.timeout(TimeValue.timeValueSeconds(5)); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - executionOrigin, - indexRequest, - ActionListener.wrap( - this::onIndexResponse, - this::onIndexFailure - ), client::index); + return indexRequest; } private XContentBuilder toXContentBuilder(ToXContent toXContent) { @@ -83,4 +179,36 @@ public abstract class AbstractAuditor { throw new RuntimeException(e); } } + + private void writeBacklog() { + assert backlog != null; + if (backlog == null) { + logger.error("Message back log has already been written"); + return; + } + + BulkRequest bulkRequest = new BulkRequest(); + ToXContent doc = backlog.poll(); + while (doc != null) { + bulkRequest.add(indexRequest(doc)); + doc = backlog.poll(); + } + + client.bulk(bulkRequest, ActionListener.wrap( + bulkItemResponses -> { + if (bulkItemResponses.hasFailures()) { + logger.warn("Failures bulk indexing the message back log: {}", bulkItemResponses.buildFailureMessage()); + } else { + logger.trace("Successfully wrote audit message backlog after upgrading template"); + } + backlog = null; + }, + this::onIndexFailure + )); + } + + // for testing + int backLogSize() { + return backlog.size(); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 00f68246e4e..f0ef26ac606 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -275,7 +275,35 @@ public final class MlIndexAndAlias { PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName) .source(templateConfig.loadBytes(), XContentType.JSON); - request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + + installIndexTemplateIfRequired(clusterState, client, request, listener); + } + + /** + * See {@link #installIndexTemplateIfRequired(ClusterState, Client, IndexTemplateConfig, ActionListener)}. + * + * Overload takes a {@code PutIndexTemplateRequest} instead of {@code IndexTemplateConfig} + * + * @param clusterState The cluster state + * @param client For putting the template + * @param templateRequest The Put template request + * @param listener Async listener + */ + public static void installIndexTemplateIfRequired( + ClusterState clusterState, + Client client, + PutIndexTemplateRequest templateRequest, + ActionListener listener + ) { + String templateName = templateRequest.name(); + + // The check for existence of the template is against the cluster state, so very cheap + if (hasIndexTemplate(clusterState, templateRequest.name())) { + listener.onResponse(true); + return; + } + + templateRequest.masterNodeTimeout(TimeValue.timeValueMinutes(1)); ActionListener innerListener = ActionListener.wrap( response -> { @@ -286,11 +314,11 @@ public final class MlIndexAndAlias { }, listener::onFailure); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, innerListener, + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, templateRequest, innerListener, client.admin().indices()::putTemplate); } - private static boolean hasIndexTemplate(ClusterState state, String templateName) { + public static boolean hasIndexTemplate(ClusterState state, String templateName) { return state.getMetadata().getTemplates().containsKey(templateName); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java index e81e2cbfc1d..95af6287d68 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java @@ -5,23 +5,45 @@ */ package org.elasticsearch.xpack.core.common.notifications; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; +import org.junit.After; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.arrayContaining; @@ -29,7 +51,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,23 +69,33 @@ public class AbstractAuditorTests extends ESTestCase { private ArgumentCaptor indexRequestCaptor; private long startMillis; + private ThreadPool threadPool; + @Before public void setUpMocks() { client = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + ThreadPool mockPool = mock(ThreadPool.class); + when(client.settings()).thenReturn(Settings.EMPTY); + when(client.threadPool()).thenReturn(mockPool); + when(mockPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); - startMillis = System.currentTimeMillis(); + + threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void shutdownThreadPool() { + threadPool.generic().shutdownNow(); + terminate(threadPool); } public void testInfo() throws IOException { - AbstractAuditor auditor = new TestAuditor(client); + AbstractAuditor auditor = createTestAuditorWithTemplateInstalled(client); auditor.info("foo", "Here is my info"); - verify(client).index(indexRequestCaptor.capture(), any()); + verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any()); IndexRequest indexRequest = indexRequestCaptor.getValue(); assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX)); assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -73,10 +109,10 @@ public class AbstractAuditorTests extends ESTestCase { } public void testWarning() throws IOException { - AbstractAuditor auditor = new TestAuditor(client); + AbstractAuditor auditor = createTestAuditorWithTemplateInstalled(client); auditor.warning("bar", "Here is my warning"); - verify(client).index(indexRequestCaptor.capture(), any()); + verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any()); IndexRequest indexRequest = indexRequestCaptor.getValue(); assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX)); assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -90,10 +126,10 @@ public class AbstractAuditorTests extends ESTestCase { } public void testError() throws IOException { - AbstractAuditor auditor = new TestAuditor(client); + AbstractAuditor auditor = createTestAuditorWithTemplateInstalled(client); auditor.error("foobar", "Here is my error"); - verify(client).index(indexRequestCaptor.capture(), any()); + verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any()); IndexRequest indexRequest = indexRequestCaptor.getValue(); assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX)); assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -106,16 +142,122 @@ public class AbstractAuditorTests extends ESTestCase { assertThat(auditMessage.getNodeName(), equalTo(TEST_NODE_NAME)); } + public void testAuditingBeforeTemplateInstalled() throws Exception { + CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1); + AbstractAuditor auditor = + createTestAuditorWithoutTemplate(client, writeSomeDocsBeforeTemplateLatch); + + auditor.error("foobar", "Here is my error to queue"); + auditor.warning("foobar", "Here is my warning to queue"); + auditor.info("foobar", "Here is my info to queue"); + + verify(client, never()).execute(eq(IndexAction.INSTANCE), any(), any()); + // fire the put template response + writeSomeDocsBeforeTemplateLatch.countDown(); + + // the back log will be written some point later + ArgumentCaptor bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class); + assertBusy(() -> + verify(client, times(1)).execute(eq(BulkAction.INSTANCE), bulkCaptor.capture(), any()) + ); + + BulkRequest bulkRequest = bulkCaptor.getValue(); + assertThat(bulkRequest.numberOfActions(), equalTo(3)); + + auditor.info("foobar", "Here is another message"); + verify(client, times(1)).execute(eq(IndexAction.INSTANCE), any(), any()); + } + + public void testMaxBufferSize() throws Exception { + CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1); + AbstractAuditor auditor = + createTestAuditorWithoutTemplate(client, writeSomeDocsBeforeTemplateLatch); + + int numThreads = 2; + int numMessagesToWrite = (AbstractAuditor.MAX_BUFFER_SIZE / numThreads) + 10; + Runnable messageWrites = () -> { + for (int i=0; i future1 = threadPool.generic().submit(messageWrites); + Future future2 = threadPool.generic().submit(messageWrites); + future1.get(); + future2.get(); + + assertThat(auditor.backLogSize(), equalTo(AbstractAuditor.MAX_BUFFER_SIZE)); + } + private static AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesReference msg) throws IOException { - XContentParser parser = XContentFactory.xContent(XContentHelper.xContentType(msg)) + XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput()); return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null); } - private static class TestAuditor extends AbstractAuditor { + private TestAuditor createTestAuditorWithTemplateInstalled(Client client) { + ImmutableOpenMap.Builder templates = ImmutableOpenMap.builder(1); + templates.put(TEST_INDEX, mock(IndexTemplateMetadata.class)); + Metadata metadata = mock(Metadata.class); + when(metadata.getTemplates()).thenReturn(templates.build()); + ClusterState state = mock(ClusterState.class); + when(state.getMetadata()).thenReturn(metadata); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(state); - TestAuditor(Client client) { - super(client, TEST_NODE_NAME, TEST_INDEX, TEST_ORIGIN, AbstractAuditMessageTests.TestAuditMessage::new); + return new TestAuditor(client, TEST_NODE_NAME, clusterService); + } + + @SuppressWarnings("unchecked") + private TestAuditor createTestAuditorWithoutTemplate(Client client, CountDownLatch latch) { + if (Mockito.mockingDetails(client).isMock() == false) { + throw new AssertionError("client should be a mock"); + } + + doAnswer(invocationOnMock -> { + ActionListener listener = + (ActionListener)invocationOnMock.getArguments()[2]; + + Runnable onPutTemplate = () -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.onFailure(e); + return; + } + listener.onResponse(new AcknowledgedResponse(true)); + }; + + threadPool.generic().submit(onPutTemplate); + + return null; + }).when(client).execute(eq(PutIndexTemplateAction.INSTANCE), any(), any()); + + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(client.admin()).thenReturn(adminClient); + + ImmutableOpenMap.Builder templates = ImmutableOpenMap.builder(0); + Metadata metadata = mock(Metadata.class); + when(metadata.getTemplates()).thenReturn(templates.build()); + ClusterState state = mock(ClusterState.class); + when(state.getMetadata()).thenReturn(metadata); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(state); + + return new TestAuditor(client, TEST_NODE_NAME, clusterService); + } + + public static class TestAuditor extends AbstractAuditor { + + TestAuditor(Client client, String nodeName, ClusterService clusterService) { + super(new OriginSettingClient(client, TEST_ORIGIN), TEST_INDEX, + new IndexTemplateConfig(TEST_INDEX, + "/org/elasticsearch/xpack/core/ml/notifications_index_template.json", Version.CURRENT.id, "xpack.ml.version", + Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id))), + nodeName, AbstractAuditMessageTests.TestAuditMessage::new, clusterService); } } } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java index 317369be9da..265e5e6bdb9 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.core.XPackSettings; @@ -46,8 +47,7 @@ public class AnnotationIndexIT extends MlSingleNodeTestCase { } public void testCreatedWhenAfterOtherMlIndex() throws Exception { - - AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "node_1"); + AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), getInstanceFromNode(ClusterService.class)); auditor.info("whatever", "blah"); // Creating a document in the .ml-notifications-000001 index should cause .ml-annotations diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index ab2bef15b2c..3716dc5cb56 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -135,7 +135,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { public void createComponents() throws Exception { Settings.Builder builder = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); - AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "test_node"); + AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), getInstanceFromNode(ClusterService.class)); jobResultsProvider = new JobResultsProvider(client(), builder.build(), new IndexNameExpressionResolver()); renormalizer = mock(Renormalizer.class); process = mock(AutodetectProcess.class); @@ -159,7 +159,8 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { auditor, JOB_ID, renormalizer, - new JobResultsPersister(originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")), + new JobResultsPersister(originSettingClient, resultsPersisterService, + new AnomalyDetectionAuditor(client(), getInstanceFromNode(ClusterService.class))), new AnnotationPersister(resultsPersisterService, auditor), process, new ModelSizeStats.Builder(JOB_ID).build(), diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java index 34912b981b0..9945a416b87 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.license.License; @@ -79,7 +80,7 @@ public class ChunkedTrainedModelPersisterIT extends MlSingleNodeTestCase { ChunkedTrainedModelPersister persister = new ChunkedTrainedModelPersister(trainedModelProvider, analyticsConfig, - new DataFrameAnalyticsAuditor(client(), "test-node"), + new DataFrameAnalyticsAuditor(client(), getInstanceFromNode(ClusterService.class)), (ex) -> { throw new ElasticsearchException(ex); }, new ExtractedFields(extractedFieldList, Collections.emptyList(), Collections.emptyMap()) ); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java index dc0d390760b..8c6eee19051 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -54,7 +55,7 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase { @Before public void createComponents() throws Exception { configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry(), - new DataFrameAnalyticsAuditor(client(), node().getNodeEnvironment().nodeId())); + new DataFrameAnalyticsAuditor(client(), getInstanceFromNode(ClusterService.class))); waitForMlTemplates(); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index 6807439464e..d4b674d5b50 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -63,7 +63,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings); jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver()); jobResultsPersister = new JobResultsPersister( - originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")); + originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService)); } public void testEstablishedMem_givenNoResults() throws Exception { diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 4c6ad04426e..2a9b493afa0 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -127,7 +127,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, builder.build()); - auditor = new AnomalyDetectionAuditor(client(), "test_node"); + auditor = new AnomalyDetectionAuditor(client(), clusterService); waitForMlTemplates(); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java index 9d209902ab3..3054e60d2f4 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java @@ -78,7 +78,7 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase { ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings); jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver()); jobResultsPersister = new JobResultsPersister( - originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")); + originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService)); } public void testUnrelatedIndexNotTouched() throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 3b7b0198de4..8a5c771c4f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -573,9 +573,9 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); - AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); - DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName()); - InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName()); + AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService); + DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService); + InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService); this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor); OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN); ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java index 357db4e13f3..5a2ede6c5fc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java @@ -46,7 +46,7 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { ROOT_RESOURCE_PATH + "meta_index_template.json", Version.CURRENT.id, VERSION_PATTERN, Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id))); - private static final IndexTemplateConfig NOTIFICATIONS_TEMPLATE = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX, + public static final IndexTemplateConfig NOTIFICATIONS_TEMPLATE = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX, ROOT_RESOURCE_PATH + "notifications_index_template.json", Version.CURRENT.id, VERSION_PATTERN, Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 7f04f31810d..1c67b08c387 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -95,7 +95,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); - AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); + AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService); if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) { List dataRemovers = createDataRemovers(client, taskId, auditor); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java index bceb30462f3..09c13c01425 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessor.java @@ -185,7 +185,7 @@ public class InferenceProcessor extends AbstractProcessor { public Factory(Client client, ClusterService clusterService, Settings settings) { this.client = client; this.maxIngestProcessors = MAX_INFERENCE_PROCESSORS.get(settings); - this.auditor = new InferenceAuditor(client, clusterService.getNodeName()); + this.auditor = new InferenceAuditor(client, clusterService); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_INFERENCE_PROCESSORS, this::setMaxIngestProcessors); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java index 6e3af2b8928..cfdb8b929eb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AnomalyDetectionAuditor.java @@ -6,15 +6,21 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.ml.notifications.AnomalyDetectionAuditMessage; +import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; public class AnomalyDetectionAuditor extends AbstractAuditor { - public AnomalyDetectionAuditor(Client client, String nodeName) { - super(client, nodeName, NotificationsIndex.NOTIFICATIONS_INDEX, ML_ORIGIN, AnomalyDetectionAuditMessage::new); + public AnomalyDetectionAuditor(Client client, ClusterService clusterService) { + super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX, + MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, + clusterService.getNodeName(), + AnomalyDetectionAuditMessage::new, clusterService); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java index 1acccaafcda..8231effc2cb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/DataFrameAnalyticsAuditor.java @@ -6,15 +6,21 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.ml.notifications.DataFrameAnalyticsAuditMessage; +import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; public class DataFrameAnalyticsAuditor extends AbstractAuditor { - public DataFrameAnalyticsAuditor(Client client, String nodeName) { - super(client, nodeName, NotificationsIndex.NOTIFICATIONS_INDEX, ML_ORIGIN, DataFrameAnalyticsAuditMessage::new); + public DataFrameAnalyticsAuditor(Client client, ClusterService clusterService) { + super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX, + MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, + clusterService.getNodeName(), + DataFrameAnalyticsAuditMessage::new, clusterService); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java index 2be3e76e85b..f76103631a5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/InferenceAuditor.java @@ -6,15 +6,19 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.core.ml.notifications.InferenceAuditMessage; +import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; public class InferenceAuditor extends AbstractAuditor { - public InferenceAuditor(Client client, String nodeName) { - super(client, nodeName, NotificationsIndex.NOTIFICATIONS_INDEX, ML_ORIGIN, InferenceAuditMessage::new); + public InferenceAuditor(Client client, ClusterService clusterService) { + super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX, + MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, clusterService.getNodeName(), InferenceAuditMessage::new, clusterService); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportForecastJobActionRequestTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportForecastJobActionRequestTests.java index f53134d32bb..ae7fc1f2d68 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportForecastJobActionRequestTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportForecastJobActionRequestTests.java @@ -7,19 +7,17 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; -import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.notifications.AnomalyDetectionAuditMessage; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import java.util.Collections; import java.util.Date; @@ -66,7 +64,7 @@ public class TransportForecastJobActionRequestTests extends ESTestCase { public void testAdjustLimit() { Job.Builder jobBuilder = createTestJob("forecast-adjust-limit"); - NullAuditor auditor = new NullAuditor(); + AnomalyDetectionAuditor auditor = mock(AnomalyDetectionAuditor.class); { assertThat(TransportForecastJobAction.getAdjustedMemoryLimit(jobBuilder.build(), null, auditor), is(nullValue())); assertThat(TransportForecastJobAction.getAdjustedMemoryLimit( @@ -109,8 +107,6 @@ public class TransportForecastJobActionRequestTests extends ESTestCase { auditor), equalTo(new ByteSizeValue(80, ByteSizeUnit.MB).getBytes() - 1L)); } - - } private Job.Builder createTestJob(String jobId) { @@ -126,23 +122,4 @@ public class TransportForecastJobActionRequestTests extends ESTestCase { jobBuilder.setDataDescription(dataDescription); return jobBuilder; } - - static class NullAuditor extends AbstractAuditor { - - protected NullAuditor() { - super(mock(Client.class), "test", "null", "foo", AnomalyDetectionAuditMessage::new); - } - - @Override - public void info(String resourceId, String message) { - } - - @Override - public void warning(String resourceId, String message) { - } - - @Override - public void error(String resourceId, String message) { - } - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index faf07b1ccec..2128cd1907b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -117,8 +117,8 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { ExecutorService executorService = EsExecutors.newDirectExecutorService(); when(tp.generic()).thenReturn(executorService); client = mock(Client.class); - clusterService = mock(ClusterService.class); Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build(); + when(client.settings()).thenReturn(Settings.EMPTY); ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java index 33e25f3e2e7..f0ad5d93519 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java @@ -61,6 +61,7 @@ public class InferenceProcessorFactoryTests extends ESTestCase { when(tp.generic()).thenReturn(executorService); client = mock(Client.class); Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build(); + when(client.settings()).thenReturn(settings); ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 820a6ef33d4..ad312134cf1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -296,7 +296,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa } TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry); - TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName()); + TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService); TransformCheckpointService checkpointService = new TransformCheckpointService(settings, clusterService, configManager, auditor); SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java index 162616feffe..970bda3098f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java @@ -5,10 +5,21 @@ */ package org.elasticsearch.xpack.transform.notifications; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex; + +import java.io.IOException; import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; @@ -17,7 +28,40 @@ import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; */ public class TransformAuditor extends AbstractAuditor { - public TransformAuditor(Client client, String nodeName) { - super(client, nodeName, TransformInternalIndexConstants.AUDIT_INDEX, TRANSFORM_ORIGIN, TransformAuditMessage::new); + public TransformAuditor(Client client, String nodeName, ClusterService clusterService) { + super(new OriginSettingClient(client, TRANSFORM_ORIGIN), TransformInternalIndexConstants.AUDIT_INDEX, + TransformInternalIndexConstants.AUDIT_INDEX, + () -> { + try { + IndexTemplateMetadata templateMeta = TransformInternalIndex.getAuditIndexTemplateMetadata( + clusterService.state().nodes().getMinNodeVersion()); + + PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateMeta.name()) + .patterns(templateMeta.patterns()) + .version(templateMeta.version()) + .settings(templateMeta.settings()) + .mapping(templateMeta.mappings().iterator().next().key, + templateMeta.mappings().iterator().next().value.uncompressed(), XContentType.JSON); + + for (ObjectObjectCursor cursor : templateMeta.getAliases()) { + AliasMetadata meta = cursor.value; + Alias alias = new Alias(meta.alias()) + .indexRouting(meta.indexRouting()) + .searchRouting(meta.searchRouting()) + .isHidden(meta.isHidden()) + .writeIndex(meta.writeIndex()); + if (meta.filter() != null) { + alias.filter(meta.getFilter().string()); + } + + request.alias(alias); + } + + return request; + } catch (IOException e) { + return null; + } + }, + nodeName, TransformAuditMessage::new, clusterService); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index b7eca071085..abde618b67c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -38,10 +38,10 @@ public class DefaultCheckpointProviderTests extends ESTestCase { private Logger checkpointProviderlogger = LogManager.getLogger(DefaultCheckpointProvider.class); @Before - public void setUpMocks() throws IllegalAccessException { + public void setUpMocks() { client = mock(Client.class); transformConfigManager = mock(IndexBasedTransformConfigManager.class); - transformAuditor = new MockTransformAuditor(); + transformAuditor = MockTransformAuditor.createMockAuditor(); } public void testReportSourceIndexChangesRunsEmpty() throws Exception { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java index 7e6a7ba12e7..b72c5d51894 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java @@ -7,8 +7,15 @@ package org.elasticsearch.xpack.transform.notifications; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.core.common.notifications.Level; +import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -16,6 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /* * Test mock auditor to verify audit expectations. @@ -26,10 +34,28 @@ import static org.mockito.Mockito.mock; */ public class MockTransformAuditor extends TransformAuditor { - private List expectations; + private static final String MOCK_NODE_NAME = "mock_node_name"; - public MockTransformAuditor() { - super(mock(Client.class), "mock_node_name"); + @SuppressWarnings("unchecked") + public static MockTransformAuditor createMockAuditor() { + ImmutableOpenMap.Builder templates = ImmutableOpenMap.builder(1); + templates.put(TransformInternalIndexConstants.AUDIT_INDEX, mock(IndexTemplateMetadata.class)); + Metadata metadata = mock(Metadata.class); + when(metadata.getTemplates()).thenReturn(templates.build()); + ClusterState state = mock(ClusterState.class); + when(state.getMetadata()).thenReturn(metadata); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(state); + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + return new MockTransformAuditor(client, clusterService); + } + + private final List expectations; + + private MockTransformAuditor(Client client, ClusterService clusterService) { + super(client, MOCK_NODE_NAME, clusterService); expectations = new CopyOnWriteArrayList<>(); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 82ac2c9541e..5b978ba1485 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -231,7 +231,7 @@ public class TransformIndexerStateTests extends ESTestCase { @Before public void setUpMocks() { - auditor = new MockTransformAuditor(); + auditor = MockTransformAuditor.createMockAuditor(); transformConfigManager = new InMemoryTransformConfigManager(); client = new NoOpClient(getTestName()); threadPool = new TestThreadPool(ThreadPool.Names.GENERIC); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index c70f73f03a0..8bf19339d09 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -278,7 +278,7 @@ public class TransformIndexerTests extends ESTestCase { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - TransformAuditor auditor = new TransformAuditor(client, "node_1"); + TransformAuditor auditor = MockTransformAuditor.createMockAuditor(); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); MockedTransformIndexer indexer = createMockIndexer( @@ -419,7 +419,7 @@ public class TransformIndexerTests extends ESTestCase { failureMessage.compareAndSet(null, message); }; - MockTransformAuditor auditor = new MockTransformAuditor(); + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); TransformContext.Listener contextListener = mock(TransformContext.Listener.class); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 0ca0ac62745..de7db226198 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -71,7 +71,7 @@ public class TransformTaskTests extends ESTestCase { when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders(); - TransformAuditor auditor = new MockTransformAuditor(); + TransformAuditor auditor = MockTransformAuditor.createMockAuditor(); TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager(); TransformCheckpointService transformsCheckpointService = new TransformCheckpointService( Settings.EMPTY, @@ -159,7 +159,7 @@ public class TransformTaskTests extends ESTestCase { when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders(); - TransformAuditor auditor = new MockTransformAuditor(); + TransformAuditor auditor = MockTransformAuditor.createMockAuditor(); TransformState transformState = new TransformState( TransformTaskState.FAILED,