diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java index 0c839ba7995..2892637a5c3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import java.math.BigDecimal; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -249,12 +250,11 @@ public final class SchemaUtil { /* * Very "magic" helper method to extract the source mappings */ - private static void getSourceFieldMappings( - Client client, - String[] index, - String[] fields, - ActionListener> listener - ) { + static void getSourceFieldMappings(Client client, String[] index, String[] fields, ActionListener> listener) { + if (index == null || index.length == 0 || fields == null || fields.length == 0) { + listener.onResponse(Collections.emptyMap()); + return; + } FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index) .fields(fields) .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java index 1aa803e109e..84aa2e55bf5 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java @@ -6,11 +6,27 @@ package org.elasticsearch.xpack.transform.transforms.pivot; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.fieldcaps.FieldCapabilities; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import java.math.BigInteger; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import static org.hamcrest.CoreMatchers.instanceOf; @@ -69,4 +85,114 @@ public class SchemaUtilTests extends ESTestCase { assertEquals(new BigInteger("18446744073709551615").doubleValue(), ((BigInteger) value).doubleValue(), 0.0); } + public void testGetSourceFieldMappings() throws InterruptedException { + try (Client client = new FieldCapsMockClient(getTestName())) { + // fields is null + this.>assertAsync( + listener -> SchemaUtil.getSourceFieldMappings(client, new String[] { "index-1", "index-2" }, null, listener), + mappings -> { + assertNotNull(mappings); + assertTrue(mappings.isEmpty()); + } + ); + + // fields is empty + this.>assertAsync( + listener -> SchemaUtil.getSourceFieldMappings(client, new String[] { "index-1", "index-2" }, new String[] {}, listener), + mappings -> { + assertNotNull(mappings); + assertTrue(mappings.isEmpty()); + } + ); + + // indices is null + this.>assertAsync( + listener -> SchemaUtil.getSourceFieldMappings(client, null, new String[] { "field-1", "field-2" }, listener), + mappings -> { + assertNotNull(mappings); + assertTrue(mappings.isEmpty()); + } + ); + + // indices is empty + this.>assertAsync( + listener -> SchemaUtil.getSourceFieldMappings(client, new String[] {}, new String[] { "field-1", "field-2" }, listener), + mappings -> { + assertNotNull(mappings); + assertTrue(mappings.isEmpty()); + } + ); + + // good use + this.>assertAsync( + listener -> SchemaUtil.getSourceFieldMappings( + client, + new String[] { "index-1", "index-2" }, + new String[] { "field-1", "field-2" }, + listener + ), + mappings -> { + assertNotNull(mappings); + assertEquals(2, mappings.size()); + assertEquals("long", mappings.get("field-1")); + assertEquals("long", mappings.get("field-2")); + } + ); + } + } + + private static class FieldCapsMockClient extends NoOpClient { + FieldCapsMockClient(String testName) { + super(testName); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (request instanceof FieldCapabilitiesRequest) { + FieldCapabilitiesRequest fieldCapsRequest = (FieldCapabilitiesRequest) request; + Map> responseMap = new HashMap<>(); + for (String field : fieldCapsRequest.fields()) { + responseMap.put(field, Collections.singletonMap(field, createFieldCapabilities(field, "long"))); + } + + final FieldCapabilitiesResponse response = new FieldCapabilitiesResponse(fieldCapsRequest.indices(), responseMap); + listener.onResponse((Response) response); + return; + } + + super.doExecute(action, request, listener); + } + } + + private static FieldCapabilities createFieldCapabilities(String name, String type) { + return new FieldCapabilities( + name, + type, + true, + true, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY, + Collections.emptyMap() + ); + } + + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + function.accept(listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } + }