[ML] Auditor ensures template is installed before writes (#63286)

The ML auditors should not write if the latest template is not present. 
Instead a PUT template request is made and the writes queued up
This commit is contained in:
David Kyle 2020-10-06 11:20:37 +01:00 committed by GitHub
parent 7405af8060
commit 8f4ef40f78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 461 additions and 96 deletions

View File

@ -8,39 +8,77 @@ package org.elasticsearch.xpack.core.common.notifications;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
private final Client client;
static final int MAX_BUFFER_SIZE = 1000;
private final OriginSettingClient client;
private final String nodeName;
private final String auditIndex;
private final String executionOrigin;
private final String templateName;
private final Supplier<PutIndexTemplateRequest> templateSupplier;
private final AbstractAuditMessageFactory<T> messageFactory;
private final AtomicBoolean hasLatestTemplate;
protected AbstractAuditor(Client client,
String nodeName,
private Queue<ToXContent> backlog;
private final ClusterService clusterService;
private final AtomicBoolean putTemplateInProgress;
protected AbstractAuditor(OriginSettingClient client,
String auditIndex,
String executionOrigin,
AbstractAuditMessageFactory<T> messageFactory) {
IndexTemplateConfig templateConfig,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService) {
this(client, auditIndex, templateConfig.getTemplateName(),
() -> new PutIndexTemplateRequest(templateConfig.getTemplateName()).source(templateConfig.loadBytes(), XContentType.JSON),
nodeName, messageFactory, clusterService);
}
protected AbstractAuditor(OriginSettingClient client,
String auditIndex,
String templateName,
Supplier<PutIndexTemplateRequest> templateSupplier,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.nodeName = Objects.requireNonNull(nodeName);
this.auditIndex = auditIndex;
this.executionOrigin = executionOrigin;
this.auditIndex = Objects.requireNonNull(auditIndex);
this.templateName = Objects.requireNonNull(templateName);
this.templateSupplier = Objects.requireNonNull(templateSupplier);
this.messageFactory = Objects.requireNonNull(messageFactory);
this.clusterService = Objects.requireNonNull(clusterService);
this.nodeName = Objects.requireNonNull(nodeName);
this.backlog = new ConcurrentLinkedQueue<>();
this.hasLatestTemplate = new AtomicBoolean();
this.putTemplateInProgress = new AtomicBoolean();
}
public void info(String resourceId, String message) {
@ -64,16 +102,74 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
}
private void indexDoc(ToXContent toXContent) {
if (hasLatestTemplate.get()) {
writeDoc(toXContent);
return;
}
if (MlIndexAndAlias.hasIndexTemplate(clusterService.state(), templateName)) {
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
}
writeDoc(toXContent);
return;
}
ActionListener<Boolean> putTemplateListener = ActionListener.wrap(
r -> {
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
}
logger.info("Auditor template [{}] successfully installed", templateName);
writeBacklog();
putTemplateInProgress.set(false);
},
e -> {
logger.warn("Error putting latest template [{}]", templateName);
putTemplateInProgress.set(false);
}
);
synchronized (this) {
if (hasLatestTemplate.get() == false) {
// synchronized so that hasLatestTemplate does not change value
// between the read and adding to the backlog
assert backlog != null;
if (backlog != null) {
if (backlog.size() >= MAX_BUFFER_SIZE) {
backlog.remove();
}
backlog.add(toXContent);
} else {
logger.error("Latest audit template missing but the back log has been written");
}
// stop multiple invocations
if (putTemplateInProgress.compareAndSet(false, true)) {
MlIndexAndAlias.installIndexTemplateIfRequired(clusterService.state(), client, templateSupplier.get(),
putTemplateListener);
}
return;
}
}
indexDoc(toXContent);
}
private void writeDoc(ToXContent toXContent) {
client.index(indexRequest(toXContent), ActionListener.wrap(
this::onIndexResponse,
this::onIndexFailure
));
}
private IndexRequest indexRequest(ToXContent toXContent) {
IndexRequest indexRequest = new IndexRequest(auditIndex);
indexRequest.source(toXContentBuilder(toXContent));
indexRequest.timeout(TimeValue.timeValueSeconds(5));
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
executionOrigin,
indexRequest,
ActionListener.wrap(
this::onIndexResponse,
this::onIndexFailure
), client::index);
return indexRequest;
}
private XContentBuilder toXContentBuilder(ToXContent toXContent) {
@ -83,4 +179,36 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
throw new RuntimeException(e);
}
}
private void writeBacklog() {
assert backlog != null;
if (backlog == null) {
logger.error("Message back log has already been written");
return;
}
BulkRequest bulkRequest = new BulkRequest();
ToXContent doc = backlog.poll();
while (doc != null) {
bulkRequest.add(indexRequest(doc));
doc = backlog.poll();
}
client.bulk(bulkRequest, ActionListener.wrap(
bulkItemResponses -> {
if (bulkItemResponses.hasFailures()) {
logger.warn("Failures bulk indexing the message back log: {}", bulkItemResponses.buildFailureMessage());
} else {
logger.trace("Successfully wrote audit message backlog after upgrading template");
}
backlog = null;
},
this::onIndexFailure
));
}
// for testing
int backLogSize() {
return backlog.size();
}
}

View File

@ -275,7 +275,35 @@ public final class MlIndexAndAlias {
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName)
.source(templateConfig.loadBytes(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
installIndexTemplateIfRequired(clusterState, client, request, listener);
}
/**
* See {@link #installIndexTemplateIfRequired(ClusterState, Client, IndexTemplateConfig, ActionListener)}.
*
* Overload takes a {@code PutIndexTemplateRequest} instead of {@code IndexTemplateConfig}
*
* @param clusterState The cluster state
* @param client For putting the template
* @param templateRequest The Put template request
* @param listener Async listener
*/
public static void installIndexTemplateIfRequired(
ClusterState clusterState,
Client client,
PutIndexTemplateRequest templateRequest,
ActionListener<Boolean> listener
) {
String templateName = templateRequest.name();
// The check for existence of the template is against the cluster state, so very cheap
if (hasIndexTemplate(clusterState, templateRequest.name())) {
listener.onResponse(true);
return;
}
templateRequest.masterNodeTimeout(TimeValue.timeValueMinutes(1));
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(
response -> {
@ -286,11 +314,11 @@ public final class MlIndexAndAlias {
},
listener::onFailure);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, innerListener,
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, templateRequest, innerListener,
client.admin().indices()::putTemplate);
}
private static boolean hasIndexTemplate(ClusterState state, String templateName) {
public static boolean hasIndexTemplate(ClusterState state, String templateName) {
return state.getMetadata().getTemplates().containsKey(templateName);
}
}

View File

@ -5,23 +5,45 @@
*/
package org.elasticsearch.xpack.core.common.notifications;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.arrayContaining;
@ -29,7 +51,11 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -43,23 +69,33 @@ public class AbstractAuditorTests extends ESTestCase {
private ArgumentCaptor<IndexRequest> indexRequestCaptor;
private long startMillis;
private ThreadPool threadPool;
@Before
public void setUpMocks() {
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
ThreadPool mockPool = mock(ThreadPool.class);
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(mockPool);
when(mockPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
startMillis = System.currentTimeMillis();
threadPool = new TestThreadPool(getClass().getName());
}
@After
public void shutdownThreadPool() {
threadPool.generic().shutdownNow();
terminate(threadPool);
}
public void testInfo() throws IOException {
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new TestAuditor(client);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithTemplateInstalled(client);
auditor.info("foo", "Here is my info");
verify(client).index(indexRequestCaptor.capture(), any());
verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX));
assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
@ -73,10 +109,10 @@ public class AbstractAuditorTests extends ESTestCase {
}
public void testWarning() throws IOException {
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new TestAuditor(client);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithTemplateInstalled(client);
auditor.warning("bar", "Here is my warning");
verify(client).index(indexRequestCaptor.capture(), any());
verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX));
assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
@ -90,10 +126,10 @@ public class AbstractAuditorTests extends ESTestCase {
}
public void testError() throws IOException {
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new TestAuditor(client);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithTemplateInstalled(client);
auditor.error("foobar", "Here is my error");
verify(client).index(indexRequestCaptor.capture(), any());
verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX));
assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
@ -106,16 +142,122 @@ public class AbstractAuditorTests extends ESTestCase {
assertThat(auditMessage.getNodeName(), equalTo(TEST_NODE_NAME));
}
public void testAuditingBeforeTemplateInstalled() throws Exception {
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor =
createTestAuditorWithoutTemplate(client, writeSomeDocsBeforeTemplateLatch);
auditor.error("foobar", "Here is my error to queue");
auditor.warning("foobar", "Here is my warning to queue");
auditor.info("foobar", "Here is my info to queue");
verify(client, never()).execute(eq(IndexAction.INSTANCE), any(), any());
// fire the put template response
writeSomeDocsBeforeTemplateLatch.countDown();
// the back log will be written some point later
ArgumentCaptor<BulkRequest> bulkCaptor = ArgumentCaptor.forClass(BulkRequest.class);
assertBusy(() ->
verify(client, times(1)).execute(eq(BulkAction.INSTANCE), bulkCaptor.capture(), any())
);
BulkRequest bulkRequest = bulkCaptor.getValue();
assertThat(bulkRequest.numberOfActions(), equalTo(3));
auditor.info("foobar", "Here is another message");
verify(client, times(1)).execute(eq(IndexAction.INSTANCE), any(), any());
}
public void testMaxBufferSize() throws Exception {
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor =
createTestAuditorWithoutTemplate(client, writeSomeDocsBeforeTemplateLatch);
int numThreads = 2;
int numMessagesToWrite = (AbstractAuditor.MAX_BUFFER_SIZE / numThreads) + 10;
Runnable messageWrites = () -> {
for (int i=0; i<numMessagesToWrite; i++ ) {
auditor.info("foobar", "filling the buffer");
}
};
Future<?> future1 = threadPool.generic().submit(messageWrites);
Future<?> future2 = threadPool.generic().submit(messageWrites);
future1.get();
future2.get();
assertThat(auditor.backLogSize(), equalTo(AbstractAuditor.MAX_BUFFER_SIZE));
}
private static AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesReference msg) throws IOException {
XContentParser parser = XContentFactory.xContent(XContentHelper.xContentType(msg))
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput());
return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null);
}
private static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> {
private TestAuditor createTestAuditorWithTemplateInstalled(Client client) {
ImmutableOpenMap.Builder<String, IndexTemplateMetadata> templates = ImmutableOpenMap.builder(1);
templates.put(TEST_INDEX, mock(IndexTemplateMetadata.class));
Metadata metadata = mock(Metadata.class);
when(metadata.getTemplates()).thenReturn(templates.build());
ClusterState state = mock(ClusterState.class);
when(state.getMetadata()).thenReturn(metadata);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(state);
TestAuditor(Client client) {
super(client, TEST_NODE_NAME, TEST_INDEX, TEST_ORIGIN, AbstractAuditMessageTests.TestAuditMessage::new);
return new TestAuditor(client, TEST_NODE_NAME, clusterService);
}
@SuppressWarnings("unchecked")
private TestAuditor createTestAuditorWithoutTemplate(Client client, CountDownLatch latch) {
if (Mockito.mockingDetails(client).isMock() == false) {
throw new AssertionError("client should be a mock");
}
doAnswer(invocationOnMock -> {
ActionListener<AcknowledgedResponse> listener =
(ActionListener<AcknowledgedResponse>)invocationOnMock.getArguments()[2];
Runnable onPutTemplate = () -> {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.onFailure(e);
return;
}
listener.onResponse(new AcknowledgedResponse(true));
};
threadPool.generic().submit(onPutTemplate);
return null;
}).when(client).execute(eq(PutIndexTemplateAction.INSTANCE), any(), any());
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(client.admin()).thenReturn(adminClient);
ImmutableOpenMap.Builder<String, IndexTemplateMetadata> templates = ImmutableOpenMap.builder(0);
Metadata metadata = mock(Metadata.class);
when(metadata.getTemplates()).thenReturn(templates.build());
ClusterState state = mock(ClusterState.class);
when(state.getMetadata()).thenReturn(metadata);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(state);
return new TestAuditor(client, TEST_NODE_NAME, clusterService);
}
public static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> {
TestAuditor(Client client, String nodeName, ClusterService clusterService) {
super(new OriginSettingClient(client, TEST_ORIGIN), TEST_INDEX,
new IndexTemplateConfig(TEST_INDEX,
"/org/elasticsearch/xpack/core/ml/notifications_index_template.json", Version.CURRENT.id, "xpack.ml.version",
Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id))),
nodeName, AbstractAuditMessageTests.TestAuditMessage::new, clusterService);
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.XPackSettings;
@ -46,8 +47,7 @@ public class AnnotationIndexIT extends MlSingleNodeTestCase {
}
public void testCreatedWhenAfterOtherMlIndex() throws Exception {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "node_1");
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), getInstanceFromNode(ClusterService.class));
auditor.info("whatever", "blah");
// Creating a document in the .ml-notifications-000001 index should cause .ml-annotations

View File

@ -135,7 +135,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
public void createComponents() throws Exception {
Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "test_node");
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), getInstanceFromNode(ClusterService.class));
jobResultsProvider = new JobResultsProvider(client(), builder.build(), new IndexNameExpressionResolver());
renormalizer = mock(Renormalizer.class);
process = mock(AutodetectProcess.class);
@ -159,7 +159,8 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
auditor,
JOB_ID,
renormalizer,
new JobResultsPersister(originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")),
new JobResultsPersister(originSettingClient, resultsPersisterService,
new AnomalyDetectionAuditor(client(), getInstanceFromNode(ClusterService.class))),
new AnnotationPersister(resultsPersisterService, auditor),
process,
new ModelSizeStats.Builder(JOB_ID).build(),

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.license.License;
@ -79,7 +80,7 @@ public class ChunkedTrainedModelPersisterIT extends MlSingleNodeTestCase {
ChunkedTrainedModelPersister persister = new ChunkedTrainedModelPersister(trainedModelProvider,
analyticsConfig,
new DataFrameAnalyticsAuditor(client(), "test-node"),
new DataFrameAnalyticsAuditor(client(), getInstanceFromNode(ClusterService.class)),
(ex) -> { throw new ElasticsearchException(ex); },
new ExtractedFields(extractedFieldList, Collections.emptyList(), Collections.emptyMap())
);

View File

@ -12,6 +12,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -54,7 +55,7 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
@Before
public void createComponents() throws Exception {
configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry(),
new DataFrameAnalyticsAuditor(client(), node().getNodeEnvironment().nodeId()));
new DataFrameAnalyticsAuditor(client(), getInstanceFromNode(ClusterService.class)));
waitForMlTemplates();
}

View File

@ -63,7 +63,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver());
jobResultsPersister = new JobResultsPersister(
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node"));
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));
}
public void testEstablishedMem_givenNoResults() throws Exception {

View File

@ -127,7 +127,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, builder.build());
auditor = new AnomalyDetectionAuditor(client(), "test_node");
auditor = new AnomalyDetectionAuditor(client(), clusterService);
waitForMlTemplates();
}

View File

@ -78,7 +78,7 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase {
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver());
jobResultsPersister = new JobResultsPersister(
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node"));
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));
}
public void testUnrelatedIndexNotTouched() throws Exception {

View File

@ -573,9 +573,9 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService);
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService);
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService);
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);

View File

@ -46,7 +46,7 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
ROOT_RESOURCE_PATH + "meta_index_template.json", Version.CURRENT.id, VERSION_PATTERN,
Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)));
private static final IndexTemplateConfig NOTIFICATIONS_TEMPLATE = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
public static final IndexTemplateConfig NOTIFICATIONS_TEMPLATE = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
ROOT_RESOURCE_PATH + "notifications_index_template.json", Version.CURRENT.id, VERSION_PATTERN,
Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)));

View File

@ -95,7 +95,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService);
if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, auditor);

View File

@ -185,7 +185,7 @@ public class InferenceProcessor extends AbstractProcessor {
public Factory(Client client, ClusterService clusterService, Settings settings) {
this.client = client;
this.maxIngestProcessors = MAX_INFERENCE_PROCESSORS.get(settings);
this.auditor = new InferenceAuditor(client, clusterService.getNodeName());
this.auditor = new InferenceAuditor(client, clusterService);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_INFERENCE_PROCESSORS, this::setMaxIngestProcessors);
}

View File

@ -6,15 +6,21 @@
package org.elasticsearch.xpack.ml.notifications;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
import org.elasticsearch.xpack.core.ml.notifications.AnomalyDetectionAuditMessage;
import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
public class AnomalyDetectionAuditor extends AbstractAuditor<AnomalyDetectionAuditMessage> {
public AnomalyDetectionAuditor(Client client, String nodeName) {
super(client, nodeName, NotificationsIndex.NOTIFICATIONS_INDEX, ML_ORIGIN, AnomalyDetectionAuditMessage::new);
public AnomalyDetectionAuditor(Client client, ClusterService clusterService) {
super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX,
MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE,
clusterService.getNodeName(),
AnomalyDetectionAuditMessage::new, clusterService);
}
}

View File

@ -6,15 +6,21 @@
package org.elasticsearch.xpack.ml.notifications;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
import org.elasticsearch.xpack.core.ml.notifications.DataFrameAnalyticsAuditMessage;
import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
public class DataFrameAnalyticsAuditor extends AbstractAuditor<DataFrameAnalyticsAuditMessage> {
public DataFrameAnalyticsAuditor(Client client, String nodeName) {
super(client, nodeName, NotificationsIndex.NOTIFICATIONS_INDEX, ML_ORIGIN, DataFrameAnalyticsAuditMessage::new);
public DataFrameAnalyticsAuditor(Client client, ClusterService clusterService) {
super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX,
MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE,
clusterService.getNodeName(),
DataFrameAnalyticsAuditMessage::new, clusterService);
}
}

View File

@ -6,15 +6,19 @@
package org.elasticsearch.xpack.ml.notifications;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
import org.elasticsearch.xpack.core.ml.notifications.InferenceAuditMessage;
import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
public class InferenceAuditor extends AbstractAuditor<InferenceAuditMessage> {
public InferenceAuditor(Client client, String nodeName) {
super(client, nodeName, NotificationsIndex.NOTIFICATIONS_INDEX, ML_ORIGIN, InferenceAuditMessage::new);
public InferenceAuditor(Client client, ClusterService clusterService) {
super(new OriginSettingClient(client, ML_ORIGIN), NotificationsIndex.NOTIFICATIONS_INDEX,
MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE, clusterService.getNodeName(), InferenceAuditMessage::new, clusterService);
}
}

View File

@ -7,19 +7,17 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.notifications.AnomalyDetectionAuditMessage;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import java.util.Collections;
import java.util.Date;
@ -66,7 +64,7 @@ public class TransportForecastJobActionRequestTests extends ESTestCase {
public void testAdjustLimit() {
Job.Builder jobBuilder = createTestJob("forecast-adjust-limit");
NullAuditor auditor = new NullAuditor();
AnomalyDetectionAuditor auditor = mock(AnomalyDetectionAuditor.class);
{
assertThat(TransportForecastJobAction.getAdjustedMemoryLimit(jobBuilder.build(), null, auditor), is(nullValue()));
assertThat(TransportForecastJobAction.getAdjustedMemoryLimit(
@ -109,8 +107,6 @@ public class TransportForecastJobActionRequestTests extends ESTestCase {
auditor),
equalTo(new ByteSizeValue(80, ByteSizeUnit.MB).getBytes() - 1L));
}
}
private Job.Builder createTestJob(String jobId) {
@ -126,23 +122,4 @@ public class TransportForecastJobActionRequestTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
return jobBuilder;
}
static class NullAuditor extends AbstractAuditor<AnomalyDetectionAuditMessage> {
protected NullAuditor() {
super(mock(Client.class), "test", "null", "foo", AnomalyDetectionAuditMessage::new);
}
@Override
public void info(String resourceId, String message) {
}
@Override
public void warning(String resourceId, String message) {
}
@Override
public void error(String resourceId, String message) {
}
}
}

View File

@ -117,8 +117,8 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(tp.generic()).thenReturn(executorService);
client = mock(Client.class);
clusterService = mock(ClusterService.class);
Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build();
when(client.settings()).thenReturn(Settings.EMPTY);
ClusterSettings clusterSettings = new ClusterSettings(settings,
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,

View File

@ -61,6 +61,7 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
when(tp.generic()).thenReturn(executorService);
client = mock(Client.class);
Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build();
when(client.settings()).thenReturn(settings);
ClusterSettings clusterSettings = new ClusterSettings(settings,
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,

View File

@ -296,7 +296,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
}
TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService);
TransformCheckpointService checkpointService = new TransformCheckpointService(settings, clusterService, configManager, auditor);
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());

View File

@ -5,10 +5,21 @@
*/
package org.elasticsearch.xpack.transform.notifications;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import java.io.IOException;
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
@ -17,7 +28,40 @@ import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
*/
public class TransformAuditor extends AbstractAuditor<TransformAuditMessage> {
public TransformAuditor(Client client, String nodeName) {
super(client, nodeName, TransformInternalIndexConstants.AUDIT_INDEX, TRANSFORM_ORIGIN, TransformAuditMessage::new);
public TransformAuditor(Client client, String nodeName, ClusterService clusterService) {
super(new OriginSettingClient(client, TRANSFORM_ORIGIN), TransformInternalIndexConstants.AUDIT_INDEX,
TransformInternalIndexConstants.AUDIT_INDEX,
() -> {
try {
IndexTemplateMetadata templateMeta = TransformInternalIndex.getAuditIndexTemplateMetadata(
clusterService.state().nodes().getMinNodeVersion());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateMeta.name())
.patterns(templateMeta.patterns())
.version(templateMeta.version())
.settings(templateMeta.settings())
.mapping(templateMeta.mappings().iterator().next().key,
templateMeta.mappings().iterator().next().value.uncompressed(), XContentType.JSON);
for (ObjectObjectCursor<String, AliasMetadata> cursor : templateMeta.getAliases()) {
AliasMetadata meta = cursor.value;
Alias alias = new Alias(meta.alias())
.indexRouting(meta.indexRouting())
.searchRouting(meta.searchRouting())
.isHidden(meta.isHidden())
.writeIndex(meta.writeIndex());
if (meta.filter() != null) {
alias.filter(meta.getFilter().string());
}
request.alias(alias);
}
return request;
} catch (IOException e) {
return null;
}
},
nodeName, TransformAuditMessage::new, clusterService);
}
}

View File

@ -38,10 +38,10 @@ public class DefaultCheckpointProviderTests extends ESTestCase {
private Logger checkpointProviderlogger = LogManager.getLogger(DefaultCheckpointProvider.class);
@Before
public void setUpMocks() throws IllegalAccessException {
public void setUpMocks() {
client = mock(Client.class);
transformConfigManager = mock(IndexBasedTransformConfigManager.class);
transformAuditor = new MockTransformAuditor();
transformAuditor = MockTransformAuditor.createMockAuditor();
}
public void testReportSourceIndexChangesRunsEmpty() throws Exception {

View File

@ -7,8 +7,15 @@
package org.elasticsearch.xpack.transform.notifications;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -16,6 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/*
* Test mock auditor to verify audit expectations.
@ -26,10 +34,28 @@ import static org.mockito.Mockito.mock;
*/
public class MockTransformAuditor extends TransformAuditor {
private List<AuditExpectation> expectations;
private static final String MOCK_NODE_NAME = "mock_node_name";
public MockTransformAuditor() {
super(mock(Client.class), "mock_node_name");
@SuppressWarnings("unchecked")
public static MockTransformAuditor createMockAuditor() {
ImmutableOpenMap.Builder<String, IndexTemplateMetadata> templates = ImmutableOpenMap.builder(1);
templates.put(TransformInternalIndexConstants.AUDIT_INDEX, mock(IndexTemplateMetadata.class));
Metadata metadata = mock(Metadata.class);
when(metadata.getTemplates()).thenReturn(templates.build());
ClusterState state = mock(ClusterState.class);
when(state.getMetadata()).thenReturn(metadata);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(state);
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
return new MockTransformAuditor(client, clusterService);
}
private final List<AuditExpectation> expectations;
private MockTransformAuditor(Client client, ClusterService clusterService) {
super(client, MOCK_NODE_NAME, clusterService);
expectations = new CopyOnWriteArrayList<>();
}

View File

@ -231,7 +231,7 @@ public class TransformIndexerStateTests extends ESTestCase {
@Before
public void setUpMocks() {
auditor = new MockTransformAuditor();
auditor = MockTransformAuditor.createMockAuditor();
transformConfigManager = new InMemoryTransformConfigManager();
client = new NoOpClient(getTestName());
threadPool = new TestThreadPool(ThreadPool.Names.GENERIC);

View File

@ -278,7 +278,7 @@ public class TransformIndexerTests extends ESTestCase {
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
TransformAuditor auditor = new TransformAuditor(client, "node_1");
TransformAuditor auditor = MockTransformAuditor.createMockAuditor();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
MockedTransformIndexer indexer = createMockIndexer(
@ -419,7 +419,7 @@ public class TransformIndexerTests extends ESTestCase {
failureMessage.compareAndSet(null, message);
};
MockTransformAuditor auditor = new MockTransformAuditor();
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);

View File

@ -71,7 +71,7 @@ public class TransformTaskTests extends ESTestCase {
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders();
TransformAuditor auditor = new MockTransformAuditor();
TransformAuditor auditor = MockTransformAuditor.createMockAuditor();
TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager();
TransformCheckpointService transformsCheckpointService = new TransformCheckpointService(
Settings.EMPTY,
@ -159,7 +159,7 @@ public class TransformTaskTests extends ESTestCase {
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders();
TransformAuditor auditor = new MockTransformAuditor();
TransformAuditor auditor = MockTransformAuditor.createMockAuditor();
TransformState transformState = new TransformState(
TransformTaskState.FAILED,