Add job config option to set the index name (elastic/elasticsearch#560)

* Add job config option to set the index name

* Check index does not already exist if ’index_name’ is set

* Don’t create alias if ‘index_name’ is the same as ‘job_id’

* Default index_name value

Set it to job_id if null and only create the index alias if job_id != index_name

* Fix compile errors after rebasing

* Address review comments

* Test if the index exists by checking the cluster state

* Update comment

Original commit: elastic/x-pack-elasticsearch@a3e7f1a5bb
This commit is contained in:
David Kyle 2016-12-20 14:03:24 +00:00 committed by GitHub
parent 3383c67cf2
commit 537fea313f
11 changed files with 293 additions and 16 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.transform.TransformConfig;
import org.elasticsearch.xpack.prelert.job.transform.TransformConfigs;
import org.elasticsearch.xpack.prelert.job.transform.verification.TransformConfigsVerifier;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.PrelertStrings;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
@ -47,9 +48,7 @@ import java.util.TreeSet;
public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent {
public static final Job PROTO =
new Job(null, null, null, null, null, 0L, null, null, null, null, null, null, null, null, null, null, null, null);
public static final long DEFAULT_BUCKETSPAN = 300;
new Job(null, null, null, null, null, 0L, null, null, null, null, null, null, null, null, null, null, null, null, null);
public static final String TYPE = "job";
@ -74,6 +73,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField TRANSFORMS = new ParseField("transforms");
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField INDEX_NAME = new ParseField("index_name");
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("jobs");
@ -124,6 +124,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setIndexName, INDEX_NAME);
}
private final String jobId;
@ -145,12 +146,13 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
private final Long resultsRetentionDays;
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final String indexName;
public Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime, long timeout,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
List<TransformConfig> transforms, ModelDebugConfig modelDebugConfig, IgnoreDowntime ignoreDowntime,
Long renormalizationWindowDays, Long backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays,
Map<String, Object> customSettings, String modelSnapshotId) {
Map<String, Object> customSettings, String modelSnapshotId, String indexName) {
this.jobId = jobId;
this.description = description;
this.createTime = createTime;
@ -169,6 +171,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
this.resultsRetentionDays = resultsRetentionDays;
this.customSettings = customSettings;
this.modelSnapshotId = modelSnapshotId;
this.indexName = indexName;
}
public Job(StreamInput in) throws IOException {
@ -190,6 +193,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
resultsRetentionDays = in.readOptionalLong();
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
indexName = in.readString();
}
@Override
@ -206,6 +210,15 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
return jobId;
}
/**
* The name of the index storing the job's results and state.
* This defaults to {@link #getId()} if a specific index name is not set.
* @return The job's index name
*/
public String getIndexName() {
return indexName;
}
/**
* The job description
*
@ -407,6 +420,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
out.writeOptionalLong(resultsRetentionDays);
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
out.writeString(indexName);
}
@Override
@ -461,9 +475,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
if (customSettings != null) {
builder.field(CUSTOM_SETTINGS.getPreferredName(), customSettings);
}
if (modelSnapshotId != null){
if (modelSnapshotId != null) {
builder.field(MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId);
}
builder.field(INDEX_NAME.getPreferredName(), indexName);
return builder;
}
@ -492,7 +507,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
&& Objects.equals(this.modelSnapshotRetentionDays, that.modelSnapshotRetentionDays)
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId);
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.indexName, that.indexName);
}
@Override
@ -500,7 +516,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
return Objects.hash(jobId, description, createTime, finishedTime, lastDataTime, timeout, analysisConfig,
analysisLimits, dataDescription, modelDebugConfig, transforms, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, ignoreDowntime, customSettings,
modelSnapshotId);
modelSnapshotId, indexName);
}
// Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@ -543,6 +559,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
private IgnoreDowntime ignoreDowntime;
private Map<String, Object> customSettings;
private String modelSnapshotId;
private String indexName;
public Builder() {
}
@ -659,6 +676,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
this.modelSnapshotId = modelSnapshotId;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public Job build() {
return build(false);
}
@ -720,10 +741,20 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
if (!PrelertStrings.isValidId(id)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName()));
}
if (Strings.isNullOrEmpty(indexName)) {
indexName = id;
}
if (!PrelertStrings.isValidId(indexName)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, INDEX_NAME.getPreferredName()));
}
return new Job(
id, description, createTime, finishedTime, lastDataTime, timeout, analysisConfig, analysisLimits,
dataDescription, transforms, modelDebugConfig, ignoreDowntime, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
indexName
);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.prelert.job.manager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -25,6 +26,7 @@ import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
@ -162,6 +164,7 @@ public class JobManager extends AbstractComponent {
*/
public void putJob(PutJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
Job job = request.getJob();
ActionListener<Boolean> delegateListener = ActionListener.wrap(jobSaved ->
jobProvider.createJobRelatedIndices(job, new ActionListener<Boolean>() {
@Override
@ -191,13 +194,16 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerPutJob(job, request.isOverwrite(), currentState);
if (currentState.metaData().index(AnomalyDetectorsIndex.getJobIndexName(job.getIndexName())) != null) {
throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS, job.getIndexName()));
}
return updateClusterState(job, request.isOverwrite(), currentState);
}
});
}
ClusterState innerPutJob(Job job, boolean overwrite, ClusterState currentState) {
ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) {
PrelertMetadata.Builder builder = createPrelertMetadataBuilder(currentState);
builder.putJob(job, overwrite);
return buildNewClusterState(currentState, builder);
@ -313,7 +319,7 @@ public class JobManager extends AbstractComponent {
builder.setIgnoreDowntime(IgnoreDowntime.ONCE);
}
return innerPutJob(builder.build(), true, currentState);
return updateClusterState(builder.build(), true, currentState);
}
});
}

View File

@ -180,6 +180,8 @@ public final class Messages {
public static final String JOB_CONFIG_TRANSFORM_UNKNOWN_TYPE = "job.config.transform.unknown.type";
public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "job.config.unknown.function";
public static final String JOB_INDEX_ALREADY_EXISTS = "job.index.already.exists";
public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "job.data.concurrent.use.close";
public static final String JOB_DATA_CONCURRENT_USE_FLUSH = "job.data.concurrent.use.flush";
public static final String JOB_DATA_CONCURRENT_USE_PAUSE = "job.data.concurrent.use.pause";

View File

@ -11,10 +11,13 @@ import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@ -192,8 +195,11 @@ public class JobProvider {
XContentBuilder auditActivityMapping = ElasticsearchMappings.auditActivityMapping();
String jobId = job.getId();
LOGGER.trace("ES API CALL: create index {}", job.getId());
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.getJobIndexName(jobId));
boolean createIndexAlias = !job.getIndexName().equals(job.getId());
String indexName = AnomalyDetectorsIndex.getJobIndexName(job.getIndexName());
LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
Settings.Builder settingsBuilder = prelertIndexSettings();
createIndexRequest.settings(settingsBuilder);
createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
@ -209,17 +215,40 @@ public class JobProvider {
createIndexRequest.mapping(AuditMessage.TYPE.getPreferredName(), auditMessageMapping);
createIndexRequest.mapping(AuditActivity.TYPE.getPreferredName(), auditActivityMapping);
if (createIndexAlias) {
final ActionListener<Boolean> responseListener = listener;
listener = ActionListener.wrap(aBoolean -> {
client.admin().indices().prepareAliases()
.addAlias(indexName, AnomalyDetectorsIndex.getJobIndexName(jobId))
.execute(new ActionListener<IndicesAliasesResponse>() {
@Override
public void onResponse(IndicesAliasesResponse indicesAliasesResponse) {
responseListener.onResponse(true);
}
@Override
public void onFailure(Exception e) {
responseListener.onFailure(e);
}
});
},
listener::onFailure);
}
final ActionListener<Boolean> createdListener = listener;
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
listener.onResponse(true);
createdListener.onResponse(true);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
createdListener.onFailure(e);
}
});
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -137,6 +137,8 @@ job.config.transform.output.name.used.more.than.once = Transform output name ''{
job.config.transform.unknown.type = Unknown TransformType ''{0}''
job.config.unknown.function = Unknown function ''{0}''
job.index.already.exists = Cannot create index ''{0}'' as it already exists
scheduler.config.invalid.option.value = Invalid {0} value ''{1}'' in scheduler configuration
scheduler.does.not.support.job.with.latency = A job configured with scheduler cannot support latency

View File

@ -57,10 +57,11 @@ public class GetJobsActionResponseTests extends AbstractStreamableTestCase<GetJo
Map<String, Object> customConfig = randomBoolean() ? Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))
: null;
String modelSnapshotId = randomBoolean() ? randomAsciiOfLength(10) : null;
String indexName = randomAsciiOfLength(10);
Job job = new Job(jobId, description, createTime, finishedTime, lastDataTime,
timeout, analysisConfig, analysisLimits, dataDescription, transformConfigList,
modelDebugConfig, ignoreDowntime, normalizationWindowDays, backgroundPersistInterval,
modelSnapshotRetentionDays, resultsRetentionDays, customConfig, modelSnapshotId);
modelSnapshotRetentionDays, resultsRetentionDays, customConfig, modelSnapshotId, indexName);
jobList.add(job);
}

View File

@ -173,6 +173,63 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(responseAsString, containsString("\"count\":1"));
}
public void testCreateJobWithIndexNameOption() throws Exception {
String jobTemplate = "{\"job_id\":\"%s\",\n" +
" \"analysis_config\" : {\n" +
" \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\"}]\n" +
" },\n" +
" \"index_name\" : \"%s\"}";
String jobId = "aliased-job";
String indexName = "non-default-index";
String jobConfig = String.format(Locale.ROOT, jobTemplate, jobId, indexName);
Response response = client().performRequest("put", PrelertPlugin.BASE_PATH + "anomaly_detectors", Collections.emptyMap(),
new StringEntity(jobConfig));
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"prelertresults-" + indexName + "\":{\"aliases\":{\"prelertresults-" + jobId + "\""));
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
addBucketResult(indexName, "1234", 1);
addBucketResult(indexName, "1236", 1);
response = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/results/buckets");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":2"));
response = client().performRequest("get", "prelertresults-" + indexName + "/result/_search");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"total\":2"));
// test that we can't create another job with the same index_name
String jobConfigSameIndexName = String.format(Locale.ROOT, jobTemplate, "new-job-id", indexName);
expectThrows(ResponseException.class, () -> client().performRequest("put",
PrelertPlugin.BASE_PATH + "anomaly_detectors", Collections.emptyMap(), new StringEntity(jobConfigSameIndexName)));
response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// check index and alias were deleted
response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString("prelertresults-" + jobId )));
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString(indexName)));
}
private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception {
try {
client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(RESULT_MAPPING));

View File

@ -425,6 +425,26 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
assertEquals(errorMessage, e.getMessage());
}
public void testBuilder_setsDefaultIndexName() {
Job.Builder builder = buildJobBuilder("foo");
Job job = builder.build();
assertEquals("foo", job.getIndexName());
}
public void testBuilder_setsIndexName() {
Job.Builder builder = buildJobBuilder("foo");
builder.setIndexName("carol");
Job job = builder.build();
assertEquals("carol", job.getIndexName());
}
public void testBuilder_withInvalidIndexNameThrows () {
Job.Builder builder = buildJobBuilder("foo");
builder.setIndexName("_bad^name");
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> builder.build());
assertEquals(Messages.getMessage(Messages.INVALID_ID, Job.INDEX_NAME.getPreferredName()), e.getMessage());
}
public static Job.Builder buildJobBuilder(String id) {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(new Date());
@ -511,6 +531,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
if (randomBoolean()) {
builder.setModelSnapshotId(randomAsciiOfLength(10));
}
if (randomBoolean()) {
builder.setIndexName(randomValidJobId());
}
return builder.build();
}
}

View File

@ -5,21 +5,32 @@
*/
package org.elasticsearch.xpack.prelert.job.manager;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.action.PutJobAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.HashSet;
@ -28,7 +39,10 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -135,6 +149,50 @@ public class JobManagerTests extends ESTestCase {
assertThat(result.results().get(9).getId(), equalTo("9"));
}
@SuppressWarnings("unchecked")
public void testPutJobFailsIfIndexExists() {
JobManager jobManager = createJobManager();
Job.Builder jobBuilder = buildJobBuilder("foo");
jobBuilder.setIndexName("my-special-place");
PutJobAction.Request request = new PutJobAction.Request(jobBuilder.build());
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.getJobIndexName("my-special-place"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.getJobIndexName("my-special-place"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().indices(indexMap)).build();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, () -> jobManager.putJob(request,
new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
}));
assertEquals("Cannot create index 'my-special-place' as it already exists", e.getMessage());
}
private JobManager createJobManager() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);

View File

@ -59,6 +59,8 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -190,6 +192,53 @@ public class JobProviderTests extends ESTestCase {
});
}
public void testCreateJobRelatedIndicies_createsAliasIfIndexNameIsSet() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.getJobIndexName("bar"), AnomalyDetectorsIndex.getJobIndexName("foo"));
Job.Builder job = buildJobBuilder("foo");
job.setIndexName("bar");
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
provider.createJobRelatedIndices(job.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), times(1)).prepareAliases();
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
}
public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor);
Job.Builder job = buildJobBuilder("foo");
job.setIndexName("foo");
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
provider.createJobRelatedIndices(job.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), never()).prepareAliases();
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
}
public void testCreateJob() throws InterruptedException, ExecutionException {
Job.Builder job = buildJobBuilder("marscapone");

View File

@ -10,6 +10,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -47,6 +49,7 @@ import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.prelert.action.DeleteJobAction;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -381,6 +384,22 @@ public class MockClientBuilder {
return this;
}
@SuppressWarnings("unchecked")
public MockClientBuilder prepareAlias(String indexName, String alias) {
IndicesAliasesRequestBuilder aliasesRequestBuilder = mock(IndicesAliasesRequestBuilder.class);
when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias))).thenReturn(aliasesRequestBuilder);
when(indicesAdminClient.prepareAliases()).thenReturn(aliasesRequestBuilder);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
ActionListener<IndicesAliasesResponse> listener = (ActionListener<IndicesAliasesResponse>) invocationOnMock.getArguments()[0];
listener.onResponse(mock(IndicesAliasesResponse.class));
return null;
}
}).when(aliasesRequestBuilder).execute(any());
return this;
}
@SuppressWarnings("unchecked")
public MockClientBuilder prepareBulk(BulkResponse response) {
ListenableActionFuture<BulkResponse> actionFuture = mock(ListenableActionFuture.class);