[ML] Fix datafeed checks when a concrete remote index is present (#43923)

A bug was introduced in 6.6.0 when we added support for
rollup indices. Rollup caps does NOT support looking at
remote indices, consequently, since we always look up rollup
caps, the datafeed fails with an error if its config
includes a concrete remote index.  (When all remote indices
in a datafeed config are wildcards the problem did not
occur.)

The rollups feature does not support remote indices, so if
there is any remote index in a datafeed config (wildcarded
or not), we can skip the rollup cap checks.  This PR
implements that change.
This commit is contained in:
Benjamin Trent 2019-07-04 07:28:47 -05:00 committed by David Roberts
parent 2a70df424d
commit 36f7259737
3 changed files with 73 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -133,12 +134,16 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
}
}
);
if (RemoteClusterLicenseChecker.containsRemoteIndex(request.getDatafeed().getIndices())) {
getRollupIndexCapsActionHandler.onResponse(new GetRollupIndexCapsAction.Response());
} else {
executeAsyncWithOrigin(client,
ML_ORIGIN,
GetRollupIndexCapsAction.INSTANCE,
new GetRollupIndexCapsAction.Request(indices),
getRollupIndexCapsActionHandler);
}
executeAsyncWithOrigin(client,
ML_ORIGIN,
GetRollupIndexCapsAction.INSTANCE,
new GetRollupIndexCapsAction.Request(indices),
getRollupIndexCapsActionHandler);
} else {
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
@ -63,13 +64,17 @@ public interface DataExtractorFactory {
}
);
GetRollupIndexCapsAction.Request request = new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0]));
ClientHelper.executeAsyncWithOrigin(
if (RemoteClusterLicenseChecker.containsRemoteIndex(datafeed.getIndices())) {
// If we have remote indices in the data feed, don't bother checking for rollup support
// Rollups + CCS is not supported
getRollupIndexCapsActionHandler.onResponse(new GetRollupIndexCapsAction.Response());
} else {
ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.ML_ORIGIN,
GetRollupIndexCapsAction.INSTANCE,
request,
new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0])),
getRollupIndexCapsActionHandler);
}
}
}

View File

@ -223,6 +223,60 @@ public class DataExtractorFactoryTests extends ESTestCase {
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
}
public void testCreateDataExtractorFactoryGivenRollupAndRemoteIndex() {
givenAggregatableRollup("myField", "max", 5, "termField");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setIndices(Collections.singletonList("cluster_two:my_index"));
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
// Test with remote index, aggregation, and no chunking
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> {
assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class));
assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
},
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
// Test with remote index, aggregation, and chunking
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
// Test with remote index, no aggregation, and no chunking
datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setIndices(Collections.singletonList("cluster_two:my_index"));
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class)),
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
// Test with remote index, no aggregation, and chunking
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
}
public void testCreateDataExtractorFactoryGivenRollupAndValidAggregationAndAutoChunk() {
givenAggregatableRollup("myField", "max", 5, "termField");
DataDescription.Builder dataDescription = new DataDescription.Builder();