Search API: Allow for script fields to extract parts of the stored _`source`, closes #349.
This commit is contained in:
parent
1e75638b31
commit
cd28afe950
|
@ -34,6 +34,10 @@ import org.elasticsearch.index.analysis.AnalyzerScope;
|
|||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -278,7 +282,9 @@ public class Lucene {
|
|||
|
||||
public static Object readFieldValue(StreamInput in) throws IOException {
|
||||
byte type = in.readByte();
|
||||
if (type == 0) {
|
||||
if (type == -1) {
|
||||
return null;
|
||||
} else if (type == 0) {
|
||||
return in.readUTF();
|
||||
} else if (type == 1) {
|
||||
return in.readInt();
|
||||
|
@ -295,12 +301,30 @@ public class Lucene {
|
|||
byte[] value = new byte[bytesSize];
|
||||
in.readFully(value);
|
||||
return value;
|
||||
} else if (type == 7) {
|
||||
int size = in.readVInt();
|
||||
List list = new ArrayList(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
list.add(readFieldValue(in));
|
||||
}
|
||||
return list;
|
||||
} else if (type == 8) {
|
||||
int size = in.readVInt();
|
||||
Map map = new HashMap();
|
||||
for (int i = 0; i < size; i++) {
|
||||
map.put(in.readUTF(), readFieldValue(in));
|
||||
}
|
||||
return map;
|
||||
} else {
|
||||
throw new IOException("Can't read unknown type [" + type + "]");
|
||||
}
|
||||
}
|
||||
|
||||
public static void writeFieldValue(StreamOutput out, Object value) throws IOException {
|
||||
if (value == null) {
|
||||
out.writeByte((byte) -1);
|
||||
return;
|
||||
}
|
||||
Class type = value.getClass();
|
||||
if (type == String.class) {
|
||||
out.writeByte((byte) 0);
|
||||
|
@ -324,6 +348,21 @@ public class Lucene {
|
|||
out.writeByte((byte) 6);
|
||||
out.writeVInt(((byte[]) value).length);
|
||||
out.writeBytes(((byte[]) value));
|
||||
} else if (value instanceof List) {
|
||||
out.writeByte((byte) 7);
|
||||
List list = (List) value;
|
||||
out.writeVInt(list.size());
|
||||
for (Object o : list) {
|
||||
writeFieldValue(out, o);
|
||||
}
|
||||
} else if (value instanceof Map) {
|
||||
out.writeByte((byte) 8);
|
||||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
out.writeVInt(map.size());
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
out.writeUTF(entry.getKey());
|
||||
writeFieldValue(out, entry.getValue());
|
||||
}
|
||||
} else {
|
||||
throw new IOException("Can't write type [" + type + "]");
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapConverter;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -210,6 +211,15 @@ public abstract class XContentBuilder<T extends XContentBuilder> {
|
|||
return builder;
|
||||
}
|
||||
|
||||
public T field(String name, List<Object> value) throws IOException {
|
||||
startArray(name);
|
||||
for (Object o : value) {
|
||||
value(o);
|
||||
}
|
||||
endArray();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public T field(String name, Object value) throws IOException {
|
||||
if (value == null) {
|
||||
nullField(name);
|
||||
|
@ -236,6 +246,8 @@ public abstract class XContentBuilder<T extends XContentBuilder> {
|
|||
field(name, (ReadableInstant) value);
|
||||
} else if (value instanceof Map) {
|
||||
field(name, (Map<String, Object>) value);
|
||||
} else if (value instanceof List) {
|
||||
field(name, (List) value);
|
||||
} else {
|
||||
field(name, value.toString());
|
||||
}
|
||||
|
|
|
@ -31,8 +31,15 @@ public interface FieldsFunction {
|
|||
void setNextReader(IndexReader reader);
|
||||
|
||||
/**
|
||||
* @param docId
|
||||
* @param docId The doc id
|
||||
* @param vars The vars providing additional parameters, should be reused and has values added to it in execute
|
||||
*/
|
||||
Object execute(int docId, Map<String, Object> vars);
|
||||
|
||||
/**
|
||||
* @param docId The doc id
|
||||
* @param vars The vars providing additional parameters, should be reused and has values added to it in execute
|
||||
* @param sameDocCache If executing against the same doc id several times (possibly with different scripts), pass this across the invocations
|
||||
*/
|
||||
Object execute(int docId, Map<String, Object> vars, Map<String, Object> sameDocCache);
|
||||
}
|
||||
|
|
|
@ -19,17 +19,29 @@
|
|||
|
||||
package org.elasticsearch.index.field.function.script;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Fieldable;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticSearchParseException;
|
||||
import org.elasticsearch.common.compress.lzf.LZFDecoder;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||
import org.elasticsearch.common.io.stream.CachedStreamInput;
|
||||
import org.elasticsearch.common.io.stream.LZFStreamInput;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.field.data.FieldData;
|
||||
import org.elasticsearch.index.field.function.FieldsFunction;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceFieldSelector;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -59,24 +71,139 @@ public class ScriptFieldsFunction implements FieldsFunction {
|
|||
|
||||
final DocMap docMap;
|
||||
|
||||
final SourceMap sourceMap;
|
||||
|
||||
public ScriptFieldsFunction(String script, ScriptService scriptService, MapperService mapperService, FieldDataCache fieldDataCache) {
|
||||
this.scriptService = scriptService;
|
||||
this.script = scriptService.compile(script);
|
||||
this.docMap = new DocMap(cachedFieldData.get().get(), mapperService, fieldDataCache);
|
||||
this.sourceMap = new SourceMap();
|
||||
}
|
||||
|
||||
@Override public void setNextReader(IndexReader reader) {
|
||||
docMap.setNextReader(reader);
|
||||
sourceMap.setNextReader(reader);
|
||||
}
|
||||
|
||||
@Override public Object execute(int docId, Map<String, Object> vars) {
|
||||
return execute(docId, vars, null);
|
||||
}
|
||||
|
||||
@Override public Object execute(int docId, Map<String, Object> vars, @Nullable Map<String, Object> sameDocCache) {
|
||||
docMap.setNextDocId(docId);
|
||||
sourceMap.setNextDocId(docId);
|
||||
if (sameDocCache != null) {
|
||||
sourceMap.parsedSource((Map<String, Object>) sameDocCache.get("parsedSource"));
|
||||
}
|
||||
if (vars == null) {
|
||||
vars = cachedVars.get().get();
|
||||
vars.clear();
|
||||
}
|
||||
vars.put("doc", docMap);
|
||||
return scriptService.execute(script, vars);
|
||||
vars.put("_source", sourceMap);
|
||||
Object retVal = scriptService.execute(script, vars);
|
||||
if (sameDocCache != null) {
|
||||
sameDocCache.put("parsedSource", sourceMap.parsedSource());
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
static class SourceMap implements Map {
|
||||
|
||||
private IndexReader reader;
|
||||
|
||||
private int docId;
|
||||
|
||||
private Map<String, Object> source;
|
||||
|
||||
public Map<String, Object> parsedSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public void parsedSource(Map<String, Object> source) {
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
private Map<String, Object> loadSourceIfNeeded() {
|
||||
if (source != null) {
|
||||
return source;
|
||||
}
|
||||
try {
|
||||
Document doc = reader.document(docId, SourceFieldSelector.INSTANCE);
|
||||
Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME);
|
||||
byte[] source = sourceField.getBinaryValue();
|
||||
if (LZFDecoder.isCompressed(source)) {
|
||||
BytesStreamInput siBytes = new BytesStreamInput(source);
|
||||
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
|
||||
XContentType contentType = XContentFactory.xContentType(siLzf);
|
||||
siLzf.resetToBufferStart();
|
||||
this.source = XContentFactory.xContent(contentType).createParser(siLzf).map();
|
||||
} else {
|
||||
this.source = XContentFactory.xContent(source).createParser(source).map();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ElasticSearchParseException("failed to parse source", e);
|
||||
}
|
||||
return this.source;
|
||||
}
|
||||
|
||||
public void setNextReader(IndexReader reader) {
|
||||
this.reader = reader;
|
||||
this.source = null;
|
||||
}
|
||||
|
||||
public void setNextDocId(int docId) {
|
||||
this.docId = docId;
|
||||
this.source = null;
|
||||
}
|
||||
|
||||
@Override public Object get(Object key) {
|
||||
return loadSourceIfNeeded().get(key);
|
||||
}
|
||||
|
||||
@Override public int size() {
|
||||
return loadSourceIfNeeded().size();
|
||||
}
|
||||
|
||||
@Override public boolean isEmpty() {
|
||||
return loadSourceIfNeeded().isEmpty();
|
||||
}
|
||||
|
||||
@Override public boolean containsKey(Object key) {
|
||||
return loadSourceIfNeeded().containsKey(key);
|
||||
}
|
||||
|
||||
@Override public boolean containsValue(Object value) {
|
||||
return loadSourceIfNeeded().containsValue(value);
|
||||
}
|
||||
|
||||
@Override public Set keySet() {
|
||||
return loadSourceIfNeeded().keySet();
|
||||
}
|
||||
|
||||
@Override public Collection values() {
|
||||
return loadSourceIfNeeded().values();
|
||||
}
|
||||
|
||||
@Override public Set entrySet() {
|
||||
return loadSourceIfNeeded().entrySet();
|
||||
}
|
||||
|
||||
@Override public Object put(Object key, Object value) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override public Object remove(Object key) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override public void putAll(Map m) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override public void clear() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
// --- Map implementation for doc field data lookup
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.mapper;
|
||||
|
||||
import org.apache.lucene.document.FieldSelector;
|
||||
import org.apache.lucene.document.FieldSelectorResult;
|
||||
|
||||
/**
|
||||
* An optimized field selector that loads just the uid.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SourceFieldSelector implements FieldSelector {
|
||||
|
||||
public static SourceFieldSelector INSTANCE = new SourceFieldSelector();
|
||||
|
||||
private SourceFieldSelector() {
|
||||
|
||||
}
|
||||
|
||||
@Override public FieldSelectorResult accept(String fieldName) {
|
||||
if (SourceFieldMapper.NAME.equals(fieldName)) {
|
||||
return FieldSelectorResult.LOAD_AND_BREAK;
|
||||
}
|
||||
return FieldSelectorResult.NO_LOAD;
|
||||
}
|
||||
}
|
|
@ -29,6 +29,12 @@ import org.apache.lucene.document.FieldSelectorResult;
|
|||
*/
|
||||
public class UidFieldSelector implements FieldSelector {
|
||||
|
||||
public static UidFieldSelector INSTANCE = new UidFieldSelector();
|
||||
|
||||
private UidFieldSelector() {
|
||||
|
||||
}
|
||||
|
||||
@Override public FieldSelectorResult accept(String fieldName) {
|
||||
if (UidFieldMapper.NAME.equals(fieldName)) {
|
||||
return FieldSelectorResult.LOAD_AND_BREAK;
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.document.Fieldable;
|
|||
import org.apache.lucene.index.IndexReader;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.search.SearchHitField;
|
||||
import org.elasticsearch.search.SearchParseElement;
|
||||
|
@ -47,6 +48,12 @@ import java.util.Map;
|
|||
*/
|
||||
public class FetchPhase implements SearchPhase {
|
||||
|
||||
private static ThreadLocal<ThreadLocals.CleanableValue<Map<String, Object>>> cachedSameDocScriptCache = new ThreadLocal<ThreadLocals.CleanableValue<Map<String, Object>>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<Map<String, Object>> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<java.util.Map<java.lang.String, java.lang.Object>>(new HashMap<String, Object>());
|
||||
}
|
||||
};
|
||||
|
||||
private final HighlightPhase highlightPhase;
|
||||
|
||||
@Inject public FetchPhase(HighlightPhase highlightPhase) {
|
||||
|
@ -70,6 +77,8 @@ public class FetchPhase implements SearchPhase {
|
|||
public void execute(SearchContext context) {
|
||||
FieldSelector fieldSelector = buildFieldSelectors(context);
|
||||
|
||||
Map<String, Object> sameDocCache = cachedSameDocScriptCache.get().get();
|
||||
|
||||
InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoadSize()];
|
||||
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
|
||||
int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
|
||||
|
@ -127,13 +136,14 @@ public class FetchPhase implements SearchPhase {
|
|||
}
|
||||
|
||||
if (context.scriptFields() != null) {
|
||||
sameDocCache.clear();
|
||||
int readerIndex = context.searcher().readerIndex(docId);
|
||||
IndexReader subReader = context.searcher().subReaders()[readerIndex];
|
||||
int subDoc = docId - context.searcher().docStarts()[readerIndex];
|
||||
for (ScriptFieldsContext.ScriptField scriptField : context.scriptFields().fields()) {
|
||||
scriptField.scriptFieldsFunction().setNextReader(subReader);
|
||||
|
||||
Object value = scriptField.scriptFieldsFunction().execute(subDoc, scriptField.params());
|
||||
Object value = scriptField.scriptFieldsFunction().execute(subDoc, scriptField.params(), sameDocCache);
|
||||
|
||||
if (searchHit.fields() == null) {
|
||||
searchHit.fields(new HashMap<String, SearchHitField>(2));
|
||||
|
@ -146,6 +156,7 @@ public class FetchPhase implements SearchPhase {
|
|||
}
|
||||
hitField.values().add(value);
|
||||
}
|
||||
sameDocCache.clear();
|
||||
}
|
||||
|
||||
doExplanation(context, docId, searchHit);
|
||||
|
@ -194,7 +205,7 @@ public class FetchPhase implements SearchPhase {
|
|||
private FieldSelector buildFieldSelectors(SearchContext context) {
|
||||
if (context.scriptFields() != null && context.fieldNames() == null) {
|
||||
// we ask for script fields, and no field names, don't load the source
|
||||
return new UidFieldSelector();
|
||||
return UidFieldSelector.INSTANCE;
|
||||
}
|
||||
|
||||
if (context.fieldNames() == null) {
|
||||
|
@ -202,7 +213,7 @@ public class FetchPhase implements SearchPhase {
|
|||
}
|
||||
|
||||
if (context.fieldNames().isEmpty()) {
|
||||
return new UidFieldSelector();
|
||||
return UidFieldSelector.INSTANCE;
|
||||
}
|
||||
|
||||
// asked for all stored fields, just return null so all of them will be loaded
|
||||
|
|
|
@ -22,12 +22,14 @@ package org.elasticsearch.test.integration.search.scriptfield;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.client.Requests.*;
|
||||
|
@ -46,20 +48,21 @@ public class ScriptFieldSearchTests extends AbstractNodesTests {
|
|||
|
||||
@BeforeMethod public void createNodes() throws Exception {
|
||||
startNode("server1");
|
||||
startNode("client1", ImmutableSettings.settingsBuilder().put("node.client", true).build());
|
||||
client = getClient();
|
||||
}
|
||||
|
||||
@AfterMethod public void closeNodes() {
|
||||
client.close();
|
||||
closeNode("client1");
|
||||
closeAllNodes();
|
||||
}
|
||||
|
||||
protected Client getClient() {
|
||||
return client("server1");
|
||||
return client("client1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomScriptBoost() throws Exception {
|
||||
@Test public void testCustomScriptBoost() throws Exception {
|
||||
client.admin().indices().prepareCreate("test").execute().actionGet();
|
||||
client.prepareIndex("test", "type1", "1")
|
||||
.setSource(jsonBuilder().startObject().field("test", "value beck").field("num1", 1.0f).field("date", "1970-01-01T00:00:00").endObject())
|
||||
|
@ -110,4 +113,38 @@ public class ScriptFieldSearchTests extends AbstractNodesTests {
|
|||
assertThat(response.hits().getAt(2).id(), equalTo("3"));
|
||||
assertThat((Double) response.hits().getAt(2).fields().get("sNum1").values().get(0), equalTo(6.0));
|
||||
}
|
||||
|
||||
@Test public void testScriptFieldUsingSource() throws Exception {
|
||||
client.admin().indices().prepareCreate("test").execute().actionGet();
|
||||
client.prepareIndex("test", "type1", "1")
|
||||
.setSource(jsonBuilder().startObject()
|
||||
.startObject("obj1").field("test", "something").endObject()
|
||||
.startObject("obj2").startArray("arr2").value("arr_value1").value("arr_value2").endArray().endObject()
|
||||
.endObject())
|
||||
.execute().actionGet();
|
||||
client.admin().indices().refresh(refreshRequest()).actionGet();
|
||||
|
||||
SearchResponse response = client.prepareSearch()
|
||||
.setQuery(matchAllQuery())
|
||||
.addScriptField("s_obj1", "_source.obj1")
|
||||
.addScriptField("s_obj1_test", "_source.obj1.test")
|
||||
.addScriptField("s_obj2", "_source.obj2")
|
||||
.addScriptField("s_obj2_arr2", "_source.obj2.arr2")
|
||||
.execute().actionGet();
|
||||
|
||||
Map<String, Object> sObj1 = (Map<String, Object>) response.hits().getAt(0).field("s_obj1").value();
|
||||
assertThat(sObj1.get("test").toString(), equalTo("something"));
|
||||
assertThat(response.hits().getAt(0).field("s_obj1_test").value().toString(), equalTo("something"));
|
||||
|
||||
Map<String, Object> sObj2 = (Map<String, Object>) response.hits().getAt(0).field("s_obj2").value();
|
||||
List sObj2Arr2 = (List) sObj2.get("arr2");
|
||||
assertThat(sObj2Arr2.size(), equalTo(2));
|
||||
assertThat(sObj2Arr2.get(0).toString(), equalTo("arr_value1"));
|
||||
assertThat(sObj2Arr2.get(1).toString(), equalTo("arr_value2"));
|
||||
|
||||
sObj2Arr2 = (List) response.hits().getAt(0).field("s_obj2_arr2").value();
|
||||
assertThat(sObj2Arr2.size(), equalTo(2));
|
||||
assertThat(sObj2Arr2.get(0).toString(), equalTo("arr_value1"));
|
||||
assertThat(sObj2Arr2.get(1).toString(), equalTo("arr_value2"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue