If a list or map value gets set on ingest document a deep copy needs to be made.

If this is not done this can lead to processor configuration being changed by an bulk or index request.
This commit is contained in:
Martijn van Groningen 2015-11-30 14:48:54 +01:00
parent 4402da1af0
commit 99a4295330
4 changed files with 94 additions and 1 deletions

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.search.aggregations.support.format.ValueParser;
import java.util.*; import java.util.*;
@ -258,6 +260,8 @@ public final class IngestDocument {
String[] pathElements = Strings.splitStringToArray(path, '.'); String[] pathElements = Strings.splitStringToArray(path, '.');
assert pathElements.length > 0; assert pathElements.length > 0;
value = deepCopy(value);
Object context = source; Object context = source;
for (int i = 0; i < pathElements.length - 1; i++) { for (int i = 0; i < pathElements.length - 1; i++) {
String pathElement = pathElements[i]; String pathElement = pathElements[i];
@ -354,6 +358,32 @@ public final class IngestDocument {
return sourceModified; return sourceModified;
} }
static Object deepCopy(Object value) {
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<Object, Object> mapValue = (Map<Object, Object>) value;
Map<Object, Object> copy = new HashMap<>(mapValue.size());
for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
copy.put(entry.getKey(), deepCopy(entry.getValue()));
}
return copy;
} else if (value instanceof List) {
@SuppressWarnings("unchecked")
List<Object> listValue = (List<Object>) value;
List<Object> copy = new ArrayList<>(listValue.size());
for (Object itemValue : listValue) {
copy.add(deepCopy(itemValue));
}
return copy;
} else if (value == null || value instanceof String || value instanceof Integer ||
value instanceof Long || value instanceof Float ||
value instanceof Double || value instanceof Boolean) {
return value;
} else {
throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]");
}
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (obj == this) { return true; } if (obj == this) { return true; }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
@ -540,4 +541,14 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(ingestDocument.hashCode(), equalTo(thirdIngestDocument.hashCode())); assertThat(ingestDocument.hashCode(), equalTo(thirdIngestDocument.hashCode()));
} }
} }
public void testDeepCopy() {
int iterations = scaledRandomIntBetween(8, 64);
for (int i = 0; i < iterations; i++) {
Map<String, Object> map = RandomDocumentPicks.randomDocument(random());
Object copy = IngestDocument.deepCopy(map);
assertThat("iteration: " + i, copy, equalTo(map));
assertThat("iteration: " + i, copy, not(sameInstance(map)));
}
}
} }

View File

@ -0,0 +1,52 @@
package org.elasticsearch.ingest;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.set.SetProcessor;
import org.elasticsearch.ingest.processor.remove.RemoveProcessor;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class PipelineTests extends ESTestCase {
public void testProcessorSettingsRemainUntouched() throws Exception {
Map<String, Object> subField = new HashMap<>();
subField.put("_subfield", "value");
Map<String, Object> fieldSettings = new HashMap<>();
fieldSettings.put("_field", subField);
Map<String, Object> addSettings = new HashMap<>();
addSettings.put("fields", fieldSettings);
Map<String, Object> removeSettings = new HashMap<>();
removeSettings.put("fields", Collections.singletonList("_field._subfield"));
Pipeline pipeline = createPipeline(processorConfig(SetProcessor.TYPE, addSettings), processorConfig(RemoveProcessor.TYPE, removeSettings));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", new HashMap<>());
pipeline.execute(ingestDocument);
assertThat(ingestDocument.getSource().get("_field"), Matchers.notNullValue());
assertThat(((Map) ingestDocument.getSource().get("_field")).get("_subfield"), Matchers.nullValue());
assertThat(((Map) fieldSettings.get("_field")).get("_subfield"), Matchers.equalTo("value"));
}
private Pipeline createPipeline(Map<String, Object>... processorConfigs) throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("processors", Arrays.asList(processorConfigs));
Map<String, Processor.Factory> factoryRegistry = new HashMap<>();
factoryRegistry.put(SetProcessor.TYPE, new SetProcessor.Factory());
factoryRegistry.put(RemoveProcessor.TYPE, new RemoveProcessor.Factory());
Pipeline.Factory factory = new Pipeline.Factory();
return factory.create("_id", config, factoryRegistry);
}
private Map<String, Object> processorConfig(String type, Map<String, Object> settings) {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put(type, settings);
return processorConfig;
}
}

View File

@ -135,7 +135,7 @@ public final class RandomDocumentPicks {
return new IngestDocument(index, type, id, document); return new IngestDocument(index, type, id, document);
} }
private static Map<String, Object> randomDocument(Random random) { public static Map<String, Object> randomDocument(Random random) {
Map<String, Object> document = new HashMap<>(); Map<String, Object> document = new HashMap<>();
addRandomFields(random, document, 0); addRandomFields(random, document, 0);
return document; return document;