Original commit: elastic/x-pack-elasticsearch@701407ecf5
This commit is contained in:
parent
99db6013ea
commit
fca0feb02d
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.ml;
|
package org.elasticsearch.xpack.ml;
|
||||||
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||||
|
@ -207,6 +208,8 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
|
||||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("Error putting the template for the notification message index", e);
|
logger.warn("Error putting the template for the notification message index", e);
|
||||||
|
listener.accept(false,
|
||||||
|
new ElasticsearchException("Error creating the template mappings for the notification message indices", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,6 +244,8 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
|
||||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
|
logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
|
||||||
|
listener.accept(false, new ElasticsearchException("Error creating template mappings for the " +
|
||||||
|
AnomalyDetectorsIndex.jobStateIndexName() + " indices", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,6 +269,8 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
|
||||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobResultsIndexPrefix() + " indices", e);
|
logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobResultsIndexPrefix() + " indices", e);
|
||||||
|
listener.accept(false, new ElasticsearchException("Error creating template mappings for the "
|
||||||
|
+ AnomalyDetectorsIndex.jobResultsIndexPrefix() + " index", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,9 +9,11 @@ import org.elasticsearch.ResourceNotFoundException;
|
||||||
import org.elasticsearch.action.Action;
|
import org.elasticsearch.action.Action;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.action.bulk.TransportBulkAction;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.index.TransportIndexAction;
|
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||||
|
@ -128,6 +130,7 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
|
||||||
super(client, action, new Request());
|
super(client, action, new Request());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Response extends AcknowledgedResponse {
|
public static class Response extends AcknowledgedResponse {
|
||||||
|
|
||||||
public Response() {
|
public Response() {
|
||||||
|
@ -151,16 +154,16 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
|
||||||
// extends TransportMasterNodeAction, because we will store in cluster state.
|
// extends TransportMasterNodeAction, because we will store in cluster state.
|
||||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||||
|
|
||||||
private final TransportIndexAction transportIndexAction;
|
private final TransportBulkAction transportBulkAction;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||||
ThreadPool threadPool, ActionFilters actionFilters,
|
ThreadPool threadPool, ActionFilters actionFilters,
|
||||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
TransportIndexAction transportIndexAction) {
|
TransportBulkAction transportBulkAction) {
|
||||||
super(settings, PutFilterAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
super(settings, PutFilterAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||||
indexNameExpressionResolver, Request::new);
|
indexNameExpressionResolver, Request::new);
|
||||||
this.transportIndexAction = transportIndexAction;
|
this.transportBulkAction = transportBulkAction;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -180,16 +183,17 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
|
||||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId);
|
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId);
|
||||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||||
indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS));
|
indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS));
|
||||||
transportIndexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
|
BulkRequest bulkRequest = new BulkRequest().add(indexRequest);
|
||||||
|
|
||||||
|
transportBulkAction.execute(bulkRequest, new ActionListener<BulkResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(IndexResponse indexResponse) {
|
public void onResponse(BulkResponse indexResponse) {
|
||||||
listener.onResponse(new Response());
|
listener.onResponse(new Response());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
logger.error("Could not create filter with ID [" + filterId + "]", e);
|
listener.onFailure(new ResourceNotFoundException("Could not create filter with ID [" + filterId + "]", e));
|
||||||
throw new ResourceNotFoundException("Could not create filter with ID [" + filterId + "]", e);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class JobProvider {
|
||||||
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
|
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
|
||||||
client.admin().indices().prepareAliases()
|
client.admin().indices().prepareAliases()
|
||||||
.addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
|
.addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
|
||||||
// we could return 'sucess && r.isAcknowledged()' instead of 'true', but that makes
|
// we could return 'success && r.isAcknowledged()' instead of 'true', but that makes
|
||||||
// testing not possible as we can't create IndicesAliasesResponse instance or
|
// testing not possible as we can't create IndicesAliasesResponse instance or
|
||||||
// mock IndicesAliasesResponse#isAcknowledged()
|
// mock IndicesAliasesResponse#isAcknowledged()
|
||||||
.execute(ActionListener.wrap(r -> finalListener.onResponse(true),
|
.execute(ActionListener.wrap(r -> finalListener.onResponse(true),
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
|
|
||||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||||
clientBuilder.createIndexRequest(resultsIndexName, captor);
|
clientBuilder.createIndexRequest(captor);
|
||||||
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter);
|
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter);
|
||||||
|
|
||||||
Job.Builder job = buildJobBuilder("foo");
|
Job.Builder job = buildJobBuilder("foo");
|
||||||
|
@ -195,7 +195,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
|
|
||||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||||
clientBuilder.createIndexRequest(indexName, captor);
|
clientBuilder.createIndexRequest(captor);
|
||||||
clientBuilder.prepareAlias(indexName, aliasName, jobFilter);
|
clientBuilder.prepareAlias(indexName, aliasName, jobFilter);
|
||||||
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
|
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
|
||||||
|
|
||||||
|
|
|
@ -175,7 +175,7 @@ public class MockClientBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
public MockClientBuilder createIndexRequest(String index, ArgumentCaptor<CreateIndexRequest> requestCapture) {
|
public MockClientBuilder createIndexRequest(ArgumentCaptor<CreateIndexRequest> requestCapture) {
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
CreateIndexResponse response = new CreateIndexResponse(true, true) {};
|
CreateIndexResponse response = new CreateIndexResponse(true, true) {};
|
||||||
|
|
Loading…
Reference in New Issue