[7.10][Transform] fix transform failure for configs with no field parameter… (#67380)
avoid illegal argument exception when fields is empty, which is a valid case fixes #67333
This commit is contained in:
parent
0d3a486141
commit
a251d26339
|
@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
|
|||
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -249,12 +250,11 @@ public final class SchemaUtil {
|
|||
/*
|
||||
* Very "magic" helper method to extract the source mappings
|
||||
*/
|
||||
private static void getSourceFieldMappings(
|
||||
Client client,
|
||||
String[] index,
|
||||
String[] fields,
|
||||
ActionListener<Map<String, String>> listener
|
||||
) {
|
||||
static void getSourceFieldMappings(Client client, String[] index, String[] fields, ActionListener<Map<String, String>> listener) {
|
||||
if (index == null || index.length == 0 || fields == null || fields.length == 0) {
|
||||
listener.onResponse(Collections.emptyMap());
|
||||
return;
|
||||
}
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
|
||||
.fields(fields)
|
||||
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
|
|
|
@ -6,11 +6,27 @@
|
|||
|
||||
package org.elasticsearch.xpack.transform.transforms.pivot;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
|
||||
|
@ -69,4 +85,114 @@ public class SchemaUtilTests extends ESTestCase {
|
|||
assertEquals(new BigInteger("18446744073709551615").doubleValue(), ((BigInteger) value).doubleValue(), 0.0);
|
||||
}
|
||||
|
||||
public void testGetSourceFieldMappings() throws InterruptedException {
|
||||
try (Client client = new FieldCapsMockClient(getTestName())) {
|
||||
// fields is null
|
||||
this.<Map<String, String>>assertAsync(
|
||||
listener -> SchemaUtil.getSourceFieldMappings(client, new String[] { "index-1", "index-2" }, null, listener),
|
||||
mappings -> {
|
||||
assertNotNull(mappings);
|
||||
assertTrue(mappings.isEmpty());
|
||||
}
|
||||
);
|
||||
|
||||
// fields is empty
|
||||
this.<Map<String, String>>assertAsync(
|
||||
listener -> SchemaUtil.getSourceFieldMappings(client, new String[] { "index-1", "index-2" }, new String[] {}, listener),
|
||||
mappings -> {
|
||||
assertNotNull(mappings);
|
||||
assertTrue(mappings.isEmpty());
|
||||
}
|
||||
);
|
||||
|
||||
// indices is null
|
||||
this.<Map<String, String>>assertAsync(
|
||||
listener -> SchemaUtil.getSourceFieldMappings(client, null, new String[] { "field-1", "field-2" }, listener),
|
||||
mappings -> {
|
||||
assertNotNull(mappings);
|
||||
assertTrue(mappings.isEmpty());
|
||||
}
|
||||
);
|
||||
|
||||
// indices is empty
|
||||
this.<Map<String, String>>assertAsync(
|
||||
listener -> SchemaUtil.getSourceFieldMappings(client, new String[] {}, new String[] { "field-1", "field-2" }, listener),
|
||||
mappings -> {
|
||||
assertNotNull(mappings);
|
||||
assertTrue(mappings.isEmpty());
|
||||
}
|
||||
);
|
||||
|
||||
// good use
|
||||
this.<Map<String, String>>assertAsync(
|
||||
listener -> SchemaUtil.getSourceFieldMappings(
|
||||
client,
|
||||
new String[] { "index-1", "index-2" },
|
||||
new String[] { "field-1", "field-2" },
|
||||
listener
|
||||
),
|
||||
mappings -> {
|
||||
assertNotNull(mappings);
|
||||
assertEquals(2, mappings.size());
|
||||
assertEquals("long", mappings.get("field-1"));
|
||||
assertEquals("long", mappings.get("field-2"));
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static class FieldCapsMockClient extends NoOpClient {
|
||||
FieldCapsMockClient(String testName) {
|
||||
super(testName);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
|
||||
ActionType<Response> action,
|
||||
Request request,
|
||||
ActionListener<Response> listener
|
||||
) {
|
||||
if (request instanceof FieldCapabilitiesRequest) {
|
||||
FieldCapabilitiesRequest fieldCapsRequest = (FieldCapabilitiesRequest) request;
|
||||
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
|
||||
for (String field : fieldCapsRequest.fields()) {
|
||||
responseMap.put(field, Collections.singletonMap(field, createFieldCapabilities(field, "long")));
|
||||
}
|
||||
|
||||
final FieldCapabilitiesResponse response = new FieldCapabilitiesResponse(fieldCapsRequest.indices(), responseMap);
|
||||
listener.onResponse((Response) response);
|
||||
return;
|
||||
}
|
||||
|
||||
super.doExecute(action, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private static FieldCapabilities createFieldCapabilities(String name, String type) {
|
||||
return new FieldCapabilities(
|
||||
name,
|
||||
type,
|
||||
true,
|
||||
true,
|
||||
Strings.EMPTY_ARRAY,
|
||||
Strings.EMPTY_ARRAY,
|
||||
Strings.EMPTY_ARRAY,
|
||||
Collections.emptyMap()
|
||||
);
|
||||
}
|
||||
|
||||
private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
||||
|
||||
LatchedActionListener<T> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
|
||||
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
|
||||
furtherTests.accept(r);
|
||||
}, e -> { fail("got unexpected exception: " + e); }), latch);
|
||||
|
||||
function.accept(listener);
|
||||
assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue