From 32b64fc9a364a9e2a2ff3ff0deb0faaf46893a95 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 30 Aug 2011 19:53:07 +0300 Subject: [PATCH] externalize get logic into a shard level get service --- .../elasticsearch/action/get/GetResponse.java | 211 ++--------- .../action/get/TransportGetAction.java | 255 +------------ .../get/TransportShardMultiGetAction.java | 5 +- .../mlt/TransportMoreLikeThisAction.java | 2 +- .../{action => index}/get/GetField.java | 2 +- .../elasticsearch/index/get/GetResult.java | 348 ++++++++++++++++++ .../index/get/ShardGetModule.java | 31 ++ .../index/get/ShardGetService.java | 309 ++++++++++++++++ .../index/service/InternalIndexService.java | 2 + .../index/shard/service/IndexShard.java | 3 + .../shard/service/InternalIndexShard.java | 10 +- 11 files changed, 738 insertions(+), 440 deletions(-) rename modules/elasticsearch/src/main/java/org/elasticsearch/{action => index}/get/GetField.java (98%) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/get/GetResult.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetModule.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java index 93162d9c7fa..bb2ff5d9fb6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java @@ -22,27 +22,18 @@ package org.elasticsearch.action.get; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.BytesHolder; -import org.elasticsearch.common.Unicode; -import org.elasticsearch.common.collect.ImmutableMap; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFDecoder; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.rest.action.support.RestXContentBuilder; -import org.elasticsearch.search.lookup.SourceLookup; +import org.elasticsearch.index.get.GetField; +import org.elasticsearch.index.get.GetResult; import java.io.IOException; import java.util.Iterator; import java.util.Map; -import static org.elasticsearch.action.get.GetField.*; -import static org.elasticsearch.common.collect.Iterators.*; -import static org.elasticsearch.common.collect.Maps.*; - /** * The response of a get action. * @@ -52,155 +43,111 @@ import static org.elasticsearch.common.collect.Maps.*; */ public class GetResponse implements ActionResponse, Streamable, Iterable, ToXContent { - private String index; - - private String type; - - private String id; - - private long version; - - private boolean exists; - - private Map fields; - - private Map sourceAsMap; - - private BytesHolder source; - - private byte[] sourceAsBytes; + private GetResult getResult; GetResponse() { } - GetResponse(String index, String type, String id, long version, boolean exists, BytesHolder source, Map fields) { - this.index = index; - this.type = type; - this.id = id; - this.version = version; - this.exists = exists; - this.source = source; - this.fields = fields; - if (this.fields == null) { - this.fields = ImmutableMap.of(); - } + GetResponse(GetResult getResult) { + this.getResult = getResult; } /** * Does the document exists. */ public boolean exists() { - return exists; + return getResult.exists(); } /** * Does the document exists. */ public boolean isExists() { - return exists; + return exists(); } /** * The index the document was fetched from. */ public String index() { - return this.index; + return getResult.index(); } /** * The index the document was fetched from. */ public String getIndex() { - return index; + return index(); } /** * The type of the document. */ public String type() { - return type; + return getResult.type(); } /** * The type of the document. */ public String getType() { - return type; + return type(); } /** * The id of the document. */ public String id() { - return id; + return getResult.id(); } /** * The id of the document. */ public String getId() { - return id; + return id(); } /** * The version of the doc. */ public long version() { - return this.version; + return getResult.version(); } /** * The version of the doc. */ public long getVersion() { - return this.version; + return version(); } /** * The source of the document if exists. */ public byte[] source() { - if (source == null) { - return null; - } - if (sourceAsBytes != null) { - return sourceAsBytes; - } - this.sourceAsBytes = sourceRef().copyBytes(); - return this.sourceAsBytes; + return getResult.source(); } /** * Returns bytes reference, also un compress the source if needed. */ public BytesHolder sourceRef() { - if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) { - try { - // TODO decompress without doing an extra copy! - this.source = new BytesHolder(LZFDecoder.decode(source.copyBytes())); - } catch (IOException e) { - throw new ElasticSearchParseException("failed to decompress source", e); - } - } - return this.source; + return getResult.sourceRef(); } /** * Is the source empty (not available) or not. */ public boolean isSourceEmpty() { - return source == null; + return getResult.isSourceEmpty(); } /** * The source of the document (as a string). */ public String sourceAsString() { - if (source == null) { - return null; - } - BytesHolder source = sourceRef(); - return Unicode.fromBytes(source.bytes(), source.offset(), source.length()); + return getResult.sourceAsString(); } /** @@ -208,140 +155,38 @@ public class GetResponse implements ActionResponse, Streamable, Iterable sourceAsMap() throws ElasticSearchParseException { - if (source == null) { - return null; - } - if (sourceAsMap != null) { - return sourceAsMap; - } - - sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length()); - return sourceAsMap; + return getResult.sourceAsMap(); } public Map getSource() { - return sourceAsMap(); + return getResult.getSource(); } public Map fields() { - return this.fields; + return getResult.fields(); } public Map getFields() { - return fields; + return fields(); } public GetField field(String name) { - return fields.get(name); + return getResult.field(name); } @Override public Iterator iterator() { - if (fields == null) { - return emptyIterator(); - } - return fields.values().iterator(); - } - - static final class Fields { - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString EXISTS = new XContentBuilderString("exists"); - static final XContentBuilderString FIELDS = new XContentBuilderString("fields"); + return getResult.iterator(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - if (!exists()) { - builder.startObject(); - builder.field(Fields._INDEX, index); - builder.field(Fields._TYPE, type); - builder.field(Fields._ID, id); - builder.field(Fields.EXISTS, false); - builder.endObject(); - } else { - builder.startObject(); - builder.field(Fields._INDEX, index); - builder.field(Fields._TYPE, type); - builder.field(Fields._ID, id); - if (version != -1) { - builder.field(Fields._VERSION, version); - } - builder.field(Fields.EXISTS, true); - if (source != null) { - RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params); - } - - if (fields != null && !fields.isEmpty()) { - builder.startObject(Fields.FIELDS); - for (GetField field : fields.values()) { - if (field.values().isEmpty()) { - continue; - } - if (field.values().size() == 1) { - builder.field(field.name(), field.values().get(0)); - } else { - builder.field(field.name()); - builder.startArray(); - for (Object value : field.values()) { - builder.value(value); - } - builder.endArray(); - } - } - builder.endObject(); - } - - - builder.endObject(); - } - return builder; + return getResult.toXContent(builder, params); } @Override public void readFrom(StreamInput in) throws IOException { - index = in.readUTF(); - type = in.readUTF(); - id = in.readUTF(); - version = in.readLong(); - exists = in.readBoolean(); - if (exists) { - if (in.readBoolean()) { - source = BytesHolder.readBytesHolder(in); - } - int size = in.readVInt(); - if (size == 0) { - fields = ImmutableMap.of(); - } else { - fields = newHashMapWithExpectedSize(size); - for (int i = 0; i < size; i++) { - GetField field = readGetField(in); - fields.put(field.name(), field); - } - } - } + getResult = GetResult.readGetResult(in); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(index); - out.writeUTF(type); - out.writeUTF(id); - out.writeLong(version); - out.writeBoolean(exists); - if (exists) { - if (source == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - source.writeTo(out); - } - if (fields == null) { - out.writeVInt(0); - } else { - out.writeVInt(fields.size()); - for (GetField field : fields.values()) { - field.writeTo(out); - } - } - } + getResult.writeTo(out); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 880a94e0eb5..744f855d79b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -19,8 +19,6 @@ package org.elasticsearch.action.get; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Fieldable; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.TransportActions; @@ -30,35 +28,17 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.lucene.document.ResetFieldSelector; -import org.elasticsearch.common.lucene.uid.UidField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.FieldMapper; -import org.elasticsearch.index.mapper.FieldMappers; -import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector; +import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.script.SearchScript; -import org.elasticsearch.search.lookup.SearchLookup; -import org.elasticsearch.search.lookup.SourceLookup; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Map; - -import static org.elasticsearch.common.collect.Maps.*; - /** * Performs the get operation. * @@ -121,237 +101,8 @@ public class TransportGetAction extends TransportShardSingleOperationAction 0; - Engine.GetResult get = null; - if (type == null || type.equals("_all")) { - for (String typeX : indexService.mapperService().types()) { - get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(typeX, id))).loadSource(loadSource)); - if (get.exists()) { - type = typeX; - break; - } else { - get.release(); - } - } - if (get == null) { - return new GetResponse(index, type, id, -1, false, null, null); - } - if (!get.exists()) { - // no need to release here as well..., we release in the for loop for non exists - return new GetResponse(index, type, id, -1, false, null, null); - } - } else { - get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(type, id))).loadSource(loadSource)); - if (!get.exists()) { - get.release(); - return new GetResponse(index, type, id, -1, false, null, null); - } - } - - DocumentMapper docMapper = indexService.mapperService().documentMapper(type); - if (docMapper == null) { - get.release(); - return new GetResponse(index, type, id, -1, false, null, null); - } - - try { - // break between having loaded it from translog (so we only have _source), and having a document to load - if (get.docIdAndVersion() != null) { - Map fields = null; - byte[] source = null; - UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); - ResetFieldSelector fieldSelector = buildFieldSelectors(docMapper, gFields); - if (fieldSelector != null) { - fieldSelector.reset(); - Document doc; - try { - doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector); - } catch (IOException e) { - throw new ElasticSearchException("Failed to get type [" + type + "] and id [" + id + "]", e); - } - source = extractSource(doc, docMapper); - - for (Object oField : doc.getFields()) { - Fieldable field = (Fieldable) oField; - String name = field.name(); - Object value = null; - FieldMappers fieldMappers = docMapper.mappers().indexName(field.name()); - if (fieldMappers != null) { - FieldMapper mapper = fieldMappers.mapper(); - if (mapper != null) { - name = mapper.names().fullName(); - value = mapper.valueForSearch(field); - } - } - if (value == null) { - if (field.isBinary()) { - value = field.getBinaryValue(); - } else { - value = field.stringValue(); - } - } - - if (fields == null) { - fields = newHashMapWithExpectedSize(2); - } - - GetField getField = fields.get(name); - if (getField == null) { - getField = new GetField(name, new ArrayList(2)); - fields.put(name, getField); - } - getField.values().add(value); - } - } - - // now, go and do the script thingy if needed - if (gFields != null && gFields.length > 0) { - SearchLookup searchLookup = null; - for (String field : gFields) { - String script = null; - if (field.contains("_source.") || field.contains("doc[")) { - script = field; - } else { - FieldMappers x = docMapper.mappers().smartName(field); - if (x != null && !x.mapper().stored()) { - script = "_source." + x.mapper().names().fullName(); - } - } - if (script != null) { - if (searchLookup == null) { - searchLookup = new SearchLookup(indexService.mapperService(), indexService.cache().fieldData()); - } - SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null); - searchScript.setNextReader(docIdAndVersion.reader); - searchScript.setNextDocId(docIdAndVersion.docId); - - try { - Object value = searchScript.run(); - if (fields == null) { - fields = newHashMapWithExpectedSize(2); - } - GetField getField = fields.get(field); - if (getField == null) { - getField = new GetField(field, new ArrayList(2)); - fields.put(field, getField); - } - getField.values().add(value); - } catch (RuntimeException e) { - if (logger.isTraceEnabled()) { - logger.trace("failed to execute get request script field [{}]", e, script); - } - // ignore - } - } - } - } - - return new GetResponse(index, type, id, get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields); - } else { - BytesHolder source = get.source(); - - Map fields = null; - boolean sourceRequested = false; - - // we can only load scripts that can run against the source - if (gFields == null) { - sourceRequested = true; - } else if (gFields.length == 0) { - // no fields, and no source - sourceRequested = false; - } else { - Map sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length()); - SearchLookup searchLookup = null; - for (String field : gFields) { - if (field.equals("_source")) { - sourceRequested = true; - continue; - } - String script = null; - if (field.contains("_source.")) { - script = field; - } else { - FieldMappers x = docMapper.mappers().smartName(field); - if (x != null) { - script = "_source." + x.mapper().names().fullName(); - } - } - if (script != null) { - if (searchLookup == null) { - searchLookup = new SearchLookup(indexService.mapperService(), indexService.cache().fieldData()); - } - SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null); - // we can't do this, only allow to run scripts against the source - //searchScript.setNextReader(docIdAndVersion.reader); - //searchScript.setNextDocId(docIdAndVersion.docId); - - // but, we need to inject the parsed source into the script, so it will be used... - searchScript.setNextSource(sourceAsMap); - - try { - Object value = searchScript.run(); - if (fields == null) { - fields = newHashMapWithExpectedSize(2); - } - GetField getField = fields.get(field); - if (getField == null) { - getField = new GetField(field, new ArrayList(2)); - fields.put(field, getField); - } - getField.values().add(value); - } catch (RuntimeException e) { - if (logger.isTraceEnabled()) { - logger.trace("failed to execute get request script field [{}]", e, script); - } - // ignore - } - } - } - } - - return new GetResponse(index, type, id, get.version(), get.exists(), sourceRequested ? source : null, fields); - } - } finally { - get.release(); - } - } - - private static ResetFieldSelector buildFieldSelectors(DocumentMapper docMapper, String... fields) { - if (fields == null) { - return docMapper.sourceMapper().fieldSelector(); - } - - // don't load anything - if (fields.length == 0) { - return null; - } - - FieldMappersFieldSelector fieldSelector = null; - for (String fieldName : fields) { - FieldMappers x = docMapper.mappers().smartName(fieldName); - if (x != null && x.mapper().stored()) { - if (fieldSelector == null) { - fieldSelector = new FieldMappersFieldSelector(); - } - fieldSelector.add(x); - } - } - - return fieldSelector; - } - - private static byte[] extractSource(Document doc, DocumentMapper documentMapper) { - byte[] source = null; - Fieldable sourceField = doc.getFieldable(documentMapper.sourceMapper().names().indexName()); - if (sourceField != null) { - source = documentMapper.sourceMapper().nativeValue(sourceField); - doc.removeField(documentMapper.sourceMapper().names().indexName()); - } - return source; + GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(), request.realtime()); + return new GetResponse(result); } @Override protected GetRequest newRequest() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index f4ce59a9e7c..bb7958c554c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -105,8 +106,8 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA String[] fields = request.fields.get(i); try { - GetResponse getResponse = TransportGetAction.load(logger, scriptService, indexService, indexShard, request.index(), type, id, fields, request.realtime()); - response.add(request.locations.get(i), getResponse); + GetResult getResult = indexShard.getService().get(type, id, fields, request.realtime()); + response.add(request.locations.get(i), new GetResponse(getResult)); } catch (Exception e) { logger.debug("[{}][{}] failed to execute multi_get for [{}]/[{}]", e, request.index(), shardId, type, id); response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), type, id, ExceptionsHelper.detailedMessage(e))); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java index 7f70c3a62d5..c0e690e7b46 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java @@ -24,7 +24,6 @@ import org.apache.lucene.index.Term; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.TransportActions; -import org.elasticsearch.action.get.GetField; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.TransportGetAction; @@ -36,6 +35,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.get.GetField; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMappers; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetField.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/GetField.java similarity index 98% rename from modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetField.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/index/get/GetField.java index 646b249fa43..a2ab35e9e9d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetField.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/GetField.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.action.get; +package org.elasticsearch.index.get; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/GetResult.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/GetResult.java new file mode 100644 index 00000000000..816b910df44 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/GetResult.java @@ -0,0 +1,348 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.get; + +import org.elasticsearch.ElasticSearchParseException; +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.compress.lzf.LZF; +import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.rest.action.support.RestXContentBuilder; +import org.elasticsearch.search.lookup.SourceLookup; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import static org.elasticsearch.common.collect.Iterators.*; +import static org.elasticsearch.common.collect.Maps.*; +import static org.elasticsearch.index.get.GetField.*; + +/** + */ +public class GetResult implements Streamable, Iterable, ToXContent { + + private String index; + + private String type; + + private String id; + + private long version; + + private boolean exists; + + private Map fields; + + private Map sourceAsMap; + + private BytesHolder source; + + private byte[] sourceAsBytes; + + GetResult() { + } + + GetResult(String index, String type, String id, long version, boolean exists, BytesHolder source, Map fields) { + this.index = index; + this.type = type; + this.id = id; + this.version = version; + this.exists = exists; + this.source = source; + this.fields = fields; + if (this.fields == null) { + this.fields = ImmutableMap.of(); + } + } + + /** + * Does the document exists. + */ + public boolean exists() { + return exists; + } + + /** + * Does the document exists. + */ + public boolean isExists() { + return exists; + } + + /** + * The index the document was fetched from. + */ + public String index() { + return this.index; + } + + /** + * The index the document was fetched from. + */ + public String getIndex() { + return index; + } + + /** + * The type of the document. + */ + public String type() { + return type; + } + + /** + * The type of the document. + */ + public String getType() { + return type; + } + + /** + * The id of the document. + */ + public String id() { + return id; + } + + /** + * The id of the document. + */ + public String getId() { + return id; + } + + /** + * The version of the doc. + */ + public long version() { + return this.version; + } + + /** + * The version of the doc. + */ + public long getVersion() { + return this.version; + } + + /** + * The source of the document if exists. + */ + public byte[] source() { + if (source == null) { + return null; + } + if (sourceAsBytes != null) { + return sourceAsBytes; + } + this.sourceAsBytes = sourceRef().copyBytes(); + return this.sourceAsBytes; + } + + /** + * Returns bytes reference, also un compress the source if needed. + */ + public BytesHolder sourceRef() { + if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) { + try { + // TODO decompress without doing an extra copy! + this.source = new BytesHolder(LZFDecoder.decode(source.copyBytes())); + } catch (IOException e) { + throw new ElasticSearchParseException("failed to decompress source", e); + } + } + return this.source; + } + + /** + * Is the source empty (not available) or not. + */ + public boolean isSourceEmpty() { + return source == null; + } + + /** + * The source of the document (as a string). + */ + public String sourceAsString() { + if (source == null) { + return null; + } + BytesHolder source = sourceRef(); + return Unicode.fromBytes(source.bytes(), source.offset(), source.length()); + } + + /** + * The source of the document (As a map). + */ + @SuppressWarnings({"unchecked"}) + public Map sourceAsMap() throws ElasticSearchParseException { + if (source == null) { + return null; + } + if (sourceAsMap != null) { + return sourceAsMap; + } + + sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length()); + return sourceAsMap; + } + + public Map getSource() { + return sourceAsMap(); + } + + public Map fields() { + return this.fields; + } + + public Map getFields() { + return fields; + } + + public GetField field(String name) { + return fields.get(name); + } + + @Override public Iterator iterator() { + if (fields == null) { + return emptyIterator(); + } + return fields.values().iterator(); + } + + static final class Fields { + static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); + static final XContentBuilderString EXISTS = new XContentBuilderString("exists"); + static final XContentBuilderString FIELDS = new XContentBuilderString("fields"); + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (!exists()) { + builder.startObject(); + builder.field(Fields._INDEX, index); + builder.field(Fields._TYPE, type); + builder.field(Fields._ID, id); + builder.field(Fields.EXISTS, false); + builder.endObject(); + } else { + builder.startObject(); + builder.field(Fields._INDEX, index); + builder.field(Fields._TYPE, type); + builder.field(Fields._ID, id); + if (version != -1) { + builder.field(Fields._VERSION, version); + } + builder.field(Fields.EXISTS, true); + if (source != null) { + RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params); + } + + if (fields != null && !fields.isEmpty()) { + builder.startObject(Fields.FIELDS); + for (GetField field : fields.values()) { + if (field.values().isEmpty()) { + continue; + } + if (field.values().size() == 1) { + builder.field(field.name(), field.values().get(0)); + } else { + builder.field(field.name()); + builder.startArray(); + for (Object value : field.values()) { + builder.value(value); + } + builder.endArray(); + } + } + builder.endObject(); + } + + + builder.endObject(); + } + return builder; + } + + public static GetResult readGetResult(StreamInput in) throws IOException { + GetResult result = new GetResult(); + result.readFrom(in); + return result; + } + + @Override public void readFrom(StreamInput in) throws IOException { + index = in.readUTF(); + type = in.readUTF(); + id = in.readUTF(); + version = in.readLong(); + exists = in.readBoolean(); + if (exists) { + if (in.readBoolean()) { + source = BytesHolder.readBytesHolder(in); + } + int size = in.readVInt(); + if (size == 0) { + fields = ImmutableMap.of(); + } else { + fields = newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) { + GetField field = readGetField(in); + fields.put(field.name(), field); + } + } + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(index); + out.writeUTF(type); + out.writeUTF(id); + out.writeLong(version); + out.writeBoolean(exists); + if (exists) { + if (source == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + source.writeTo(out); + } + if (fields == null) { + out.writeVInt(0); + } else { + out.writeVInt(fields.size()); + for (GetField field : fields.values()) { + field.writeTo(out); + } + } + } + } +} + diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetModule.java new file mode 100644 index 00000000000..5356a095f76 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetModule.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.get; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + */ +public class ShardGetModule extends AbstractModule { + + @Override protected void configure() { + bind(ShardGetService.class).asEagerSingleton(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java new file mode 100644 index 00000000000..795a9d8ae83 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -0,0 +1,309 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.get; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Fieldable; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.document.ResetFieldSelector; +import org.elasticsearch.common.lucene.uid.UidField; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.cache.IndexCache; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.mapper.FieldMappers; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.SearchScript; +import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.search.lookup.SourceLookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; + +import static org.elasticsearch.common.collect.Maps.*; + +/** + */ +public class ShardGetService extends AbstractIndexShardComponent { + + private final ScriptService scriptService; + + private final MapperService mapperService; + + private final IndexCache indexCache; + + private IndexShard indexShard; + + @Inject public ShardGetService(ShardId shardId, @IndexSettings Settings indexSettings, ScriptService scriptService, + MapperService mapperService, IndexCache indexCache) { + super(shardId, indexSettings); + this.scriptService = scriptService; + this.mapperService = mapperService; + this.indexCache = indexCache; + } + + // sadly, to overcome cyclic dep, we need to do this and inject it ourselves... + public ShardGetService setIndexShard(IndexShard indexShard) { + this.indexShard = indexShard; + return this; + } + + public GetResult get(String type, String id, String[] gFields, boolean realtime) throws ElasticSearchException { + boolean loadSource = gFields == null || gFields.length > 0; + Engine.GetResult get = null; + if (type == null || type.equals("_all")) { + for (String typeX : mapperService.types()) { + get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(typeX, id))).loadSource(loadSource)); + if (get.exists()) { + type = typeX; + break; + } else { + get.release(); + } + } + if (get == null) { + return new GetResult(shardId.index().name(), type, id, -1, false, null, null); + } + if (!get.exists()) { + // no need to release here as well..., we release in the for loop for non exists + return new GetResult(shardId.index().name(), type, id, -1, false, null, null); + } + } else { + get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(type, id))).loadSource(loadSource)); + if (!get.exists()) { + get.release(); + return new GetResult(shardId.index().name(), type, id, -1, false, null, null); + } + } + + DocumentMapper docMapper = mapperService.documentMapper(type); + if (docMapper == null) { + get.release(); + return new GetResult(shardId.index().name(), type, id, -1, false, null, null); + } + + try { + // break between having loaded it from translog (so we only have _source), and having a document to load + if (get.docIdAndVersion() != null) { + Map fields = null; + byte[] source = null; + UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); + ResetFieldSelector fieldSelector = buildFieldSelectors(docMapper, gFields); + if (fieldSelector != null) { + fieldSelector.reset(); + Document doc; + try { + doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector); + } catch (IOException e) { + throw new ElasticSearchException("Failed to get type [" + type + "] and id [" + id + "]", e); + } + source = extractSource(doc, docMapper); + + for (Object oField : doc.getFields()) { + Fieldable field = (Fieldable) oField; + String name = field.name(); + Object value = null; + FieldMappers fieldMappers = docMapper.mappers().indexName(field.name()); + if (fieldMappers != null) { + FieldMapper mapper = fieldMappers.mapper(); + if (mapper != null) { + name = mapper.names().fullName(); + value = mapper.valueForSearch(field); + } + } + if (value == null) { + if (field.isBinary()) { + value = field.getBinaryValue(); + } else { + value = field.stringValue(); + } + } + + if (fields == null) { + fields = newHashMapWithExpectedSize(2); + } + + GetField getField = fields.get(name); + if (getField == null) { + getField = new GetField(name, new ArrayList(2)); + fields.put(name, getField); + } + getField.values().add(value); + } + } + + // now, go and do the script thingy if needed + if (gFields != null && gFields.length > 0) { + SearchLookup searchLookup = null; + for (String field : gFields) { + String script = null; + if (field.contains("_source.") || field.contains("doc[")) { + script = field; + } else { + FieldMappers x = docMapper.mappers().smartName(field); + if (x != null && !x.mapper().stored()) { + script = "_source." + x.mapper().names().fullName(); + } + } + if (script != null) { + if (searchLookup == null) { + searchLookup = new SearchLookup(mapperService, indexCache.fieldData()); + } + SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null); + searchScript.setNextReader(docIdAndVersion.reader); + searchScript.setNextDocId(docIdAndVersion.docId); + + try { + Object value = searchScript.run(); + if (fields == null) { + fields = newHashMapWithExpectedSize(2); + } + GetField getField = fields.get(field); + if (getField == null) { + getField = new GetField(field, new ArrayList(2)); + fields.put(field, getField); + } + getField.values().add(value); + } catch (RuntimeException e) { + if (logger.isTraceEnabled()) { + logger.trace("failed to execute get request script field [{}]", e, script); + } + // ignore + } + } + } + } + + return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields); + } else { + BytesHolder source = get.source(); + + Map fields = null; + boolean sourceRequested = false; + + // we can only load scripts that can run against the source + if (gFields == null) { + sourceRequested = true; + } else if (gFields.length == 0) { + // no fields, and no source + sourceRequested = false; + } else { + Map sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length()); + SearchLookup searchLookup = null; + for (String field : gFields) { + if (field.equals("_source")) { + sourceRequested = true; + continue; + } + String script = null; + if (field.contains("_source.")) { + script = field; + } else { + FieldMappers x = docMapper.mappers().smartName(field); + if (x != null) { + script = "_source." + x.mapper().names().fullName(); + } + } + if (script != null) { + if (searchLookup == null) { + searchLookup = new SearchLookup(mapperService, indexCache.fieldData()); + } + SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null); + // we can't do this, only allow to run scripts against the source + //searchScript.setNextReader(docIdAndVersion.reader); + //searchScript.setNextDocId(docIdAndVersion.docId); + + // but, we need to inject the parsed source into the script, so it will be used... + searchScript.setNextSource(sourceAsMap); + + try { + Object value = searchScript.run(); + if (fields == null) { + fields = newHashMapWithExpectedSize(2); + } + GetField getField = fields.get(field); + if (getField == null) { + getField = new GetField(field, new ArrayList(2)); + fields.put(field, getField); + } + getField.values().add(value); + } catch (RuntimeException e) { + if (logger.isTraceEnabled()) { + logger.trace("failed to execute get request script field [{}]", e, script); + } + // ignore + } + } + } + } + + return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), sourceRequested ? source : null, fields); + } + } finally { + get.release(); + } + } + + private static ResetFieldSelector buildFieldSelectors(DocumentMapper docMapper, String... fields) { + if (fields == null) { + return docMapper.sourceMapper().fieldSelector(); + } + + // don't load anything + if (fields.length == 0) { + return null; + } + + FieldMappersFieldSelector fieldSelector = null; + for (String fieldName : fields) { + FieldMappers x = docMapper.mappers().smartName(fieldName); + if (x != null && x.mapper().stored()) { + if (fieldSelector == null) { + fieldSelector = new FieldMappersFieldSelector(); + } + fieldSelector.add(x); + } + } + + return fieldSelector; + } + + private static byte[] extractSource(Document doc, DocumentMapper documentMapper) { + byte[] source = null; + Fieldable sourceField = doc.getFieldable(documentMapper.sourceMapper().names().indexName()); + if (sourceField != null) { + source = documentMapper.sourceMapper().nativeValue(sourceField); + doc.removeField(documentMapper.sourceMapper().names().indexName()); + } + return source; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 8b6fc653d16..35f351d8cb8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.engine.IndexEngine; import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexShardGatewayModule; import org.elasticsearch.index.gateway.IndexShardGatewayService; +import org.elasticsearch.index.get.ShardGetModule; import org.elasticsearch.index.indexing.ShardIndexingModule; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.policy.MergePolicyModule; @@ -281,6 +282,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); modules.add(new IndexShardModule(shardId)); modules.add(new ShardIndexingModule()); + modules.add(new ShardGetModule()); modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class))); modules.add(new DeletionPolicyModule(indexSettings)); modules.add(new MergePolicyModule(indexSettings)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 90052970437..04a59a7d693 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.flush.FlushStats; +import org.elasticsearch.index.get.ShardGetService; import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.ParsedDocument; @@ -45,6 +46,8 @@ public interface IndexShard extends IndexShardComponent { ShardIndexingService indexingService(); + ShardGetService getService(); + ShardRouting routingEntry(); DocsStats docStats(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index d740620efbf..b98101ccb4b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -47,6 +47,7 @@ import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.OptimizeFailedEngineException; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.flush.FlushStats; +import org.elasticsearch.index.get.ShardGetService; import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.DocumentMapper; @@ -107,6 +108,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private final ShardIndexingService indexingService; + private final ShardGetService getService; + private final Object mutex = new Object(); @@ -131,7 +134,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private final MeanMetric flushMetric = new MeanMetric(); @Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, - ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService) { + ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService) { super(shardId, indexSettings); this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indexSettingsService = indexSettingsService; @@ -145,6 +148,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I this.indexCache = indexCache; this.indexAliasesService = indexAliasesService; this.indexingService = indexingService; + this.getService = getService.setIndexShard(this); state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime("index.refresh_interval", engine.defaultRefreshInterval())); @@ -177,6 +181,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return this.indexingService; } + @Override public ShardGetService getService() { + return this.getService; + } + @Override public ShardRouting routingEntry() { return this.shardRouting; }