diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStatsTests.java index 7a2b9a2d09c..266967e27b9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStatsTests.java @@ -13,6 +13,12 @@ import java.io.IOException; public class DataFrameTransformStateAndStatsTests extends AbstractSerializingDataFrameTestCase { + public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) { + return new DataFrameTransformStateAndStats(id, + DataFrameTransformStateTests.randomDataFrameTransformState(), + DataFrameIndexerTransformStatsTests.randomStats()); + } + public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats() { return new DataFrameTransformStateAndStats(randomAlphaOfLengthBetween(1, 10), DataFrameTransformStateTests.randomDataFrameTransformState(), diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java index 07b2bfa251c..188f9a02edc 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java @@ -21,10 +21,12 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsActio import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.indexing.IndexerState; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -78,31 +80,38 @@ public class DataFrameFeatureSet implements XPackFeatureSet { GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL); client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap( - transforms -> { - Set transformIds = transforms.getTransformConfigurations() - .stream() - .map(DataFrameTransformConfig::getId) - .collect(Collectors.toSet()); - GetDataFrameTransformsStatsAction.Request transformStatsRequest = - new GetDataFrameTransformsStatsAction.Request(MetaData.ALL); - client.execute(GetDataFrameTransformsStatsAction.INSTANCE, - transformStatsRequest, - ActionListener.wrap(transformStatsResponse -> { - Map transformsCountByState = new HashMap<>(); - DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats(); - - transformStatsResponse.getTransformsStateAndStats().forEach(singleResult -> { - transformIds.remove(singleResult.getId()); - transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum); - accumulatedStats.merge(singleResult.getTransformStats()); - }); - // If there is no state returned, assumed stopped - transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum)); - - listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats)); - }, listener::onFailure)); - }, - listener::onFailure + transforms -> { + GetDataFrameTransformsStatsAction.Request transformStatsRequest = + new GetDataFrameTransformsStatsAction.Request(MetaData.ALL); + client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest, + ActionListener.wrap(transformStatsResponse -> { + listener.onResponse(createUsage(available(), enabled(), transforms.getTransformConfigurations(), + transformStatsResponse.getTransformsStateAndStats())); + }, listener::onFailure)); + }, + listener::onFailure )); } + + static DataFrameFeatureSetUsage createUsage(boolean available, boolean enabled, + List transforms, + List transformsStateAndStats) { + + Set transformIds = transforms.stream() + .map(DataFrameTransformConfig::getId) + .collect(Collectors.toSet()); + + Map transformsCountByState = new HashMap<>(); + DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats(); + + transformsStateAndStats.forEach(singleResult -> { + transformIds.remove(singleResult.getId()); + transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum); + accumulatedStats.merge(singleResult.getTransformStats()); + }); + // If there is no state returned, assumed stopped + transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum)); + + return new DataFrameFeatureSetUsage(available, enabled, transformsCountByState, accumulatedStats); + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java index 99ad498a63b..6c314adab74 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; @@ -19,14 +18,12 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackFeatureSet.Usage; -import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; +import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests; -import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; -import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.junit.Before; @@ -39,9 +36,6 @@ import java.util.concurrent.ExecutionException; import static java.lang.Math.toIntExact; import static org.hamcrest.core.Is.is; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -73,21 +67,20 @@ public class DataFrameFeatureSetTests extends ESTestCase { assertTrue(featureSet.enabled()); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40022") - public void testUsage() throws InterruptedException, ExecutionException, IOException { - Client client = mock(Client.class); - when(licenseState.isDataFrameAllowed()).thenReturn(true); - - DataFrameFeatureSet featureSet = new DataFrameFeatureSet(Settings.EMPTY, client, licenseState); - + public void testUsage() throws IOException { List transformsStateAndStats = new ArrayList<>(); - for (int i = 0; i < randomIntBetween(0, 10); ++i) { - transformsStateAndStats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats()); + int count = randomIntBetween(0, 10); + int uniqueId = 0; + for (int i = 0; i < count; ++i) { + transformsStateAndStats.add( + DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats("df-" + Integer.toString(uniqueId++))); } + count = randomIntBetween(0, 10); List transformConfigWithoutTasks = new ArrayList<>(); - for (int i = 0; i < randomIntBetween(0, 10); ++i) { - transformConfigWithoutTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig()); + for (int i = 0; i < count; ++i) { + transformConfigWithoutTasks.add( + DataFrameTransformConfigTests.randomDataFrameTransformConfig("df-" + Integer.toString(uniqueId++))); } List transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size()); @@ -98,35 +91,17 @@ public class DataFrameFeatureSetTests extends ESTestCase { allConfigs.addAll(transformConfigWithoutTasks); allConfigs.addAll(transformConfigWithTasks); - GetDataFrameTransformsStatsAction.Response mockResponse = new GetDataFrameTransformsStatsAction.Response(transformsStateAndStats); - GetDataFrameTransformsAction.Response mockTransformsResponse = new GetDataFrameTransformsAction.Response(allConfigs); + boolean enabled = randomBoolean(); + boolean available = randomBoolean(); + DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available, enabled, allConfigs, transformsStateAndStats); - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(mockResponse); - return Void.TYPE; - }).when(client).execute(same(GetDataFrameTransformsStatsAction.INSTANCE), any(), any()); - - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - ActionListener listener = - (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(mockTransformsResponse); - return Void.TYPE; - }).when(client).execute(same(GetDataFrameTransformsAction.INSTANCE), any(), any()); - - PlainActionFuture future = new PlainActionFuture<>(); - featureSet.usage(future); - XPackFeatureSet.Usage usage = future.get(); - - assertTrue(usage.enabled()); + assertEquals(enabled, usage.enabled()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { usage.toXContent(builder, ToXContent.EMPTY_PARAMS); XContentParser parser = createParser(builder); Map usageAsMap = parser.map(); - assertTrue((boolean) XContentMapValues.extractValue("available", usageAsMap)); + assertEquals(available, (boolean) XContentMapValues.extractValue("available", usageAsMap)); if (transformsStateAndStats.isEmpty() && transformConfigWithoutTasks.isEmpty()) { // no transforms, no stats @@ -142,12 +117,16 @@ public class DataFrameFeatureSetTests extends ESTestCase { transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum)); stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap))); - DataFrameIndexerTransformStats combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats()) - .reduce((l, r) -> l.merge(r)).get(); + // use default constructed stats object for assertions if transformsStateAndStats is empty + DataFrameIndexerTransformStats combinedStats = new DataFrameIndexerTransformStats(); + if (transformsStateAndStats.isEmpty() == false) { + combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats()).reduce((l, r) -> l.merge(r)).get(); + } assertEquals(toIntExact(combinedStats.getIndexFailures()), XContentMapValues.extractValue("stats.index_failures", usageAsMap)); - assertEquals(toIntExact(combinedStats.getIndexTotal()), XContentMapValues.extractValue("stats.index_total", usageAsMap)); + assertEquals(toIntExact(combinedStats.getIndexTotal()), + XContentMapValues.extractValue("stats.index_total", usageAsMap)); assertEquals(toIntExact(combinedStats.getSearchTime()), XContentMapValues.extractValue("stats.search_time_in_ms", usageAsMap)); assertEquals(toIntExact(combinedStats.getNumDocuments()),