externalize get logic into a shard level get service

This commit is contained in:
Shay Banon 2011-08-30 19:53:07 +03:00
parent 48a26f286b
commit 32b64fc9a3
11 changed files with 738 additions and 440 deletions

View File

@ -22,27 +22,18 @@ package org.elasticsearch.action.get;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.action.get.GetField.*;
import static org.elasticsearch.common.collect.Iterators.*;
import static org.elasticsearch.common.collect.Maps.*;
/**
* The response of a get action.
*
@ -52,155 +43,111 @@ import static org.elasticsearch.common.collect.Maps.*;
*/
public class GetResponse implements ActionResponse, Streamable, Iterable<GetField>, ToXContent {
private String index;
private String type;
private String id;
private long version;
private boolean exists;
private Map<String, GetField> fields;
private Map<String, Object> sourceAsMap;
private BytesHolder source;
private byte[] sourceAsBytes;
private GetResult getResult;
GetResponse() {
}
GetResponse(String index, String type, String id, long version, boolean exists, BytesHolder source, Map<String, GetField> fields) {
this.index = index;
this.type = type;
this.id = id;
this.version = version;
this.exists = exists;
this.source = source;
this.fields = fields;
if (this.fields == null) {
this.fields = ImmutableMap.of();
}
GetResponse(GetResult getResult) {
this.getResult = getResult;
}
/**
* Does the document exists.
*/
public boolean exists() {
return exists;
return getResult.exists();
}
/**
* Does the document exists.
*/
public boolean isExists() {
return exists;
return exists();
}
/**
* The index the document was fetched from.
*/
public String index() {
return this.index;
return getResult.index();
}
/**
* The index the document was fetched from.
*/
public String getIndex() {
return index;
return index();
}
/**
* The type of the document.
*/
public String type() {
return type;
return getResult.type();
}
/**
* The type of the document.
*/
public String getType() {
return type;
return type();
}
/**
* The id of the document.
*/
public String id() {
return id;
return getResult.id();
}
/**
* The id of the document.
*/
public String getId() {
return id;
return id();
}
/**
* The version of the doc.
*/
public long version() {
return this.version;
return getResult.version();
}
/**
* The version of the doc.
*/
public long getVersion() {
return this.version;
return version();
}
/**
* The source of the document if exists.
*/
public byte[] source() {
if (source == null) {
return null;
}
if (sourceAsBytes != null) {
return sourceAsBytes;
}
this.sourceAsBytes = sourceRef().copyBytes();
return this.sourceAsBytes;
return getResult.source();
}
/**
* Returns bytes reference, also un compress the source if needed.
*/
public BytesHolder sourceRef() {
if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) {
try {
// TODO decompress without doing an extra copy!
this.source = new BytesHolder(LZFDecoder.decode(source.copyBytes()));
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
}
return this.source;
return getResult.sourceRef();
}
/**
* Is the source empty (not available) or not.
*/
public boolean isSourceEmpty() {
return source == null;
return getResult.isSourceEmpty();
}
/**
* The source of the document (as a string).
*/
public String sourceAsString() {
if (source == null) {
return null;
}
BytesHolder source = sourceRef();
return Unicode.fromBytes(source.bytes(), source.offset(), source.length());
return getResult.sourceAsString();
}
/**
@ -208,140 +155,38 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
*/
@SuppressWarnings({"unchecked"})
public Map<String, Object> sourceAsMap() throws ElasticSearchParseException {
if (source == null) {
return null;
}
if (sourceAsMap != null) {
return sourceAsMap;
}
sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
return sourceAsMap;
return getResult.sourceAsMap();
}
public Map<String, Object> getSource() {
return sourceAsMap();
return getResult.getSource();
}
public Map<String, GetField> fields() {
return this.fields;
return getResult.fields();
}
public Map<String, GetField> getFields() {
return fields;
return fields();
}
public GetField field(String name) {
return fields.get(name);
return getResult.field(name);
}
@Override public Iterator<GetField> iterator() {
if (fields == null) {
return emptyIterator();
}
return fields.values().iterator();
}
static final class Fields {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString EXISTS = new XContentBuilderString("exists");
static final XContentBuilderString FIELDS = new XContentBuilderString("fields");
return getResult.iterator();
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (!exists()) {
builder.startObject();
builder.field(Fields._INDEX, index);
builder.field(Fields._TYPE, type);
builder.field(Fields._ID, id);
builder.field(Fields.EXISTS, false);
builder.endObject();
} else {
builder.startObject();
builder.field(Fields._INDEX, index);
builder.field(Fields._TYPE, type);
builder.field(Fields._ID, id);
if (version != -1) {
builder.field(Fields._VERSION, version);
}
builder.field(Fields.EXISTS, true);
if (source != null) {
RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params);
}
if (fields != null && !fields.isEmpty()) {
builder.startObject(Fields.FIELDS);
for (GetField field : fields.values()) {
if (field.values().isEmpty()) {
continue;
}
if (field.values().size() == 1) {
builder.field(field.name(), field.values().get(0));
} else {
builder.field(field.name());
builder.startArray();
for (Object value : field.values()) {
builder.value(value);
}
builder.endArray();
}
}
builder.endObject();
}
builder.endObject();
}
return builder;
return getResult.toXContent(builder, params);
}
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
type = in.readUTF();
id = in.readUTF();
version = in.readLong();
exists = in.readBoolean();
if (exists) {
if (in.readBoolean()) {
source = BytesHolder.readBytesHolder(in);
}
int size = in.readVInt();
if (size == 0) {
fields = ImmutableMap.of();
} else {
fields = newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
GetField field = readGetField(in);
fields.put(field.name(), field);
}
}
}
getResult = GetResult.readGetResult(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(type);
out.writeUTF(id);
out.writeLong(version);
out.writeBoolean(exists);
if (exists) {
if (source == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
source.writeTo(out);
}
if (fields == null) {
out.writeVInt(0);
} else {
out.writeVInt(fields.size());
for (GetField field : fields.values()) {
field.writeTo(out);
}
}
}
getResult.writeTo(out);
}
}

View File

@ -19,8 +19,6 @@
package org.elasticsearch.action.get;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
@ -30,35 +28,17 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.document.ResetFieldSelector;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMappers;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import static org.elasticsearch.common.collect.Maps.*;
/**
* Performs the get operation.
*
@ -121,237 +101,8 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
indexShard.refresh(new Engine.Refresh(false));
}
return load(logger, scriptService, indexService, indexShard, request.index(), request.type(), request.id(), request.fields(), request.realtime());
}
public static GetResponse load(ESLogger logger, ScriptService scriptService, IndexService indexService, IndexShard indexShard, String index, String type, String id, String[] gFields, boolean realtime) throws ElasticSearchException {
boolean loadSource = gFields == null || gFields.length > 0;
Engine.GetResult get = null;
if (type == null || type.equals("_all")) {
for (String typeX : indexService.mapperService().types()) {
get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(typeX, id))).loadSource(loadSource));
if (get.exists()) {
type = typeX;
break;
} else {
get.release();
}
}
if (get == null) {
return new GetResponse(index, type, id, -1, false, null, null);
}
if (!get.exists()) {
// no need to release here as well..., we release in the for loop for non exists
return new GetResponse(index, type, id, -1, false, null, null);
}
} else {
get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(type, id))).loadSource(loadSource));
if (!get.exists()) {
get.release();
return new GetResponse(index, type, id, -1, false, null, null);
}
}
DocumentMapper docMapper = indexService.mapperService().documentMapper(type);
if (docMapper == null) {
get.release();
return new GetResponse(index, type, id, -1, false, null, null);
}
try {
// break between having loaded it from translog (so we only have _source), and having a document to load
if (get.docIdAndVersion() != null) {
Map<String, GetField> fields = null;
byte[] source = null;
UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
ResetFieldSelector fieldSelector = buildFieldSelectors(docMapper, gFields);
if (fieldSelector != null) {
fieldSelector.reset();
Document doc;
try {
doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector);
} catch (IOException e) {
throw new ElasticSearchException("Failed to get type [" + type + "] and id [" + id + "]", e);
}
source = extractSource(doc, docMapper);
for (Object oField : doc.getFields()) {
Fieldable field = (Fieldable) oField;
String name = field.name();
Object value = null;
FieldMappers fieldMappers = docMapper.mappers().indexName(field.name());
if (fieldMappers != null) {
FieldMapper mapper = fieldMappers.mapper();
if (mapper != null) {
name = mapper.names().fullName();
value = mapper.valueForSearch(field);
}
}
if (value == null) {
if (field.isBinary()) {
value = field.getBinaryValue();
} else {
value = field.stringValue();
}
}
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(name);
if (getField == null) {
getField = new GetField(name, new ArrayList<Object>(2));
fields.put(name, getField);
}
getField.values().add(value);
}
}
// now, go and do the script thingy if needed
if (gFields != null && gFields.length > 0) {
SearchLookup searchLookup = null;
for (String field : gFields) {
String script = null;
if (field.contains("_source.") || field.contains("doc[")) {
script = field;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null && !x.mapper().stored()) {
script = "_source." + x.mapper().names().fullName();
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(indexService.mapperService(), indexService.cache().fieldData());
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
searchScript.setNextReader(docIdAndVersion.reader);
searchScript.setNextDocId(docIdAndVersion.docId);
try {
Object value = searchScript.run();
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
}
}
}
}
return new GetResponse(index, type, id, get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields);
} else {
BytesHolder source = get.source();
Map<String, GetField> fields = null;
boolean sourceRequested = false;
// we can only load scripts that can run against the source
if (gFields == null) {
sourceRequested = true;
} else if (gFields.length == 0) {
// no fields, and no source
sourceRequested = false;
} else {
Map<String, Object> sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
SearchLookup searchLookup = null;
for (String field : gFields) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
String script = null;
if (field.contains("_source.")) {
script = field;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null) {
script = "_source." + x.mapper().names().fullName();
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(indexService.mapperService(), indexService.cache().fieldData());
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
// we can't do this, only allow to run scripts against the source
//searchScript.setNextReader(docIdAndVersion.reader);
//searchScript.setNextDocId(docIdAndVersion.docId);
// but, we need to inject the parsed source into the script, so it will be used...
searchScript.setNextSource(sourceAsMap);
try {
Object value = searchScript.run();
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
}
}
}
}
return new GetResponse(index, type, id, get.version(), get.exists(), sourceRequested ? source : null, fields);
}
} finally {
get.release();
}
}
private static ResetFieldSelector buildFieldSelectors(DocumentMapper docMapper, String... fields) {
if (fields == null) {
return docMapper.sourceMapper().fieldSelector();
}
// don't load anything
if (fields.length == 0) {
return null;
}
FieldMappersFieldSelector fieldSelector = null;
for (String fieldName : fields) {
FieldMappers x = docMapper.mappers().smartName(fieldName);
if (x != null && x.mapper().stored()) {
if (fieldSelector == null) {
fieldSelector = new FieldMappersFieldSelector();
}
fieldSelector.add(x);
}
}
return fieldSelector;
}
private static byte[] extractSource(Document doc, DocumentMapper documentMapper) {
byte[] source = null;
Fieldable sourceField = doc.getFieldable(documentMapper.sourceMapper().names().indexName());
if (sourceField != null) {
source = documentMapper.sourceMapper().nativeValue(sourceField);
doc.removeField(documentMapper.sourceMapper().names().indexName());
}
return source;
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(), request.realtime());
return new GetResponse(result);
}
@Override protected GetRequest newRequest() {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
@ -105,8 +106,8 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
String[] fields = request.fields.get(i);
try {
GetResponse getResponse = TransportGetAction.load(logger, scriptService, indexService, indexShard, request.index(), type, id, fields, request.realtime());
response.add(request.locations.get(i), getResponse);
GetResult getResult = indexShard.getService().get(type, id, fields, request.realtime());
response.add(request.locations.get(i), new GetResponse(getResult));
} catch (Exception e) {
logger.debug("[{}][{}] failed to execute multi_get for [{}]/[{}]", e, request.index(), shardId, type, id);
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), type, id, ExceptionsHelper.detailedMessage(e)));

View File

@ -24,7 +24,6 @@ import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.get.GetField;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.TransportGetAction;
@ -36,6 +35,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMappers;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.action.get;
package org.elasticsearch.index.get;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -0,0 +1,348 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.get;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.common.collect.Iterators.*;
import static org.elasticsearch.common.collect.Maps.*;
import static org.elasticsearch.index.get.GetField.*;
/**
*/
public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
private String index;
private String type;
private String id;
private long version;
private boolean exists;
private Map<String, GetField> fields;
private Map<String, Object> sourceAsMap;
private BytesHolder source;
private byte[] sourceAsBytes;
GetResult() {
}
GetResult(String index, String type, String id, long version, boolean exists, BytesHolder source, Map<String, GetField> fields) {
this.index = index;
this.type = type;
this.id = id;
this.version = version;
this.exists = exists;
this.source = source;
this.fields = fields;
if (this.fields == null) {
this.fields = ImmutableMap.of();
}
}
/**
* Does the document exists.
*/
public boolean exists() {
return exists;
}
/**
* Does the document exists.
*/
public boolean isExists() {
return exists;
}
/**
* The index the document was fetched from.
*/
public String index() {
return this.index;
}
/**
* The index the document was fetched from.
*/
public String getIndex() {
return index;
}
/**
* The type of the document.
*/
public String type() {
return type;
}
/**
* The type of the document.
*/
public String getType() {
return type;
}
/**
* The id of the document.
*/
public String id() {
return id;
}
/**
* The id of the document.
*/
public String getId() {
return id;
}
/**
* The version of the doc.
*/
public long version() {
return this.version;
}
/**
* The version of the doc.
*/
public long getVersion() {
return this.version;
}
/**
* The source of the document if exists.
*/
public byte[] source() {
if (source == null) {
return null;
}
if (sourceAsBytes != null) {
return sourceAsBytes;
}
this.sourceAsBytes = sourceRef().copyBytes();
return this.sourceAsBytes;
}
/**
* Returns bytes reference, also un compress the source if needed.
*/
public BytesHolder sourceRef() {
if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) {
try {
// TODO decompress without doing an extra copy!
this.source = new BytesHolder(LZFDecoder.decode(source.copyBytes()));
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
}
return this.source;
}
/**
* Is the source empty (not available) or not.
*/
public boolean isSourceEmpty() {
return source == null;
}
/**
* The source of the document (as a string).
*/
public String sourceAsString() {
if (source == null) {
return null;
}
BytesHolder source = sourceRef();
return Unicode.fromBytes(source.bytes(), source.offset(), source.length());
}
/**
* The source of the document (As a map).
*/
@SuppressWarnings({"unchecked"})
public Map<String, Object> sourceAsMap() throws ElasticSearchParseException {
if (source == null) {
return null;
}
if (sourceAsMap != null) {
return sourceAsMap;
}
sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
return sourceAsMap;
}
public Map<String, Object> getSource() {
return sourceAsMap();
}
public Map<String, GetField> fields() {
return this.fields;
}
public Map<String, GetField> getFields() {
return fields;
}
public GetField field(String name) {
return fields.get(name);
}
@Override public Iterator<GetField> iterator() {
if (fields == null) {
return emptyIterator();
}
return fields.values().iterator();
}
static final class Fields {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString EXISTS = new XContentBuilderString("exists");
static final XContentBuilderString FIELDS = new XContentBuilderString("fields");
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (!exists()) {
builder.startObject();
builder.field(Fields._INDEX, index);
builder.field(Fields._TYPE, type);
builder.field(Fields._ID, id);
builder.field(Fields.EXISTS, false);
builder.endObject();
} else {
builder.startObject();
builder.field(Fields._INDEX, index);
builder.field(Fields._TYPE, type);
builder.field(Fields._ID, id);
if (version != -1) {
builder.field(Fields._VERSION, version);
}
builder.field(Fields.EXISTS, true);
if (source != null) {
RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params);
}
if (fields != null && !fields.isEmpty()) {
builder.startObject(Fields.FIELDS);
for (GetField field : fields.values()) {
if (field.values().isEmpty()) {
continue;
}
if (field.values().size() == 1) {
builder.field(field.name(), field.values().get(0));
} else {
builder.field(field.name());
builder.startArray();
for (Object value : field.values()) {
builder.value(value);
}
builder.endArray();
}
}
builder.endObject();
}
builder.endObject();
}
return builder;
}
public static GetResult readGetResult(StreamInput in) throws IOException {
GetResult result = new GetResult();
result.readFrom(in);
return result;
}
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
type = in.readUTF();
id = in.readUTF();
version = in.readLong();
exists = in.readBoolean();
if (exists) {
if (in.readBoolean()) {
source = BytesHolder.readBytesHolder(in);
}
int size = in.readVInt();
if (size == 0) {
fields = ImmutableMap.of();
} else {
fields = newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
GetField field = readGetField(in);
fields.put(field.name(), field);
}
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(type);
out.writeUTF(id);
out.writeLong(version);
out.writeBoolean(exists);
if (exists) {
if (source == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
source.writeTo(out);
}
if (fields == null) {
out.writeVInt(0);
} else {
out.writeVInt(fields.size());
for (GetField field : fields.values()) {
field.writeTo(out);
}
}
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.get;
import org.elasticsearch.common.inject.AbstractModule;
/**
*/
public class ShardGetModule extends AbstractModule {
@Override protected void configure() {
bind(ShardGetService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,309 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.get;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.document.ResetFieldSelector;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMappers;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import static org.elasticsearch.common.collect.Maps.*;
/**
*/
public class ShardGetService extends AbstractIndexShardComponent {
private final ScriptService scriptService;
private final MapperService mapperService;
private final IndexCache indexCache;
private IndexShard indexShard;
@Inject public ShardGetService(ShardId shardId, @IndexSettings Settings indexSettings, ScriptService scriptService,
MapperService mapperService, IndexCache indexCache) {
super(shardId, indexSettings);
this.scriptService = scriptService;
this.mapperService = mapperService;
this.indexCache = indexCache;
}
// sadly, to overcome cyclic dep, we need to do this and inject it ourselves...
public ShardGetService setIndexShard(IndexShard indexShard) {
this.indexShard = indexShard;
return this;
}
public GetResult get(String type, String id, String[] gFields, boolean realtime) throws ElasticSearchException {
boolean loadSource = gFields == null || gFields.length > 0;
Engine.GetResult get = null;
if (type == null || type.equals("_all")) {
for (String typeX : mapperService.types()) {
get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(typeX, id))).loadSource(loadSource));
if (get.exists()) {
type = typeX;
break;
} else {
get.release();
}
}
if (get == null) {
return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
}
if (!get.exists()) {
// no need to release here as well..., we release in the for loop for non exists
return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
}
} else {
get = indexShard.get(new Engine.Get(realtime, UidFieldMapper.TERM_FACTORY.createTerm(Uid.createUid(type, id))).loadSource(loadSource));
if (!get.exists()) {
get.release();
return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
}
}
DocumentMapper docMapper = mapperService.documentMapper(type);
if (docMapper == null) {
get.release();
return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
}
try {
// break between having loaded it from translog (so we only have _source), and having a document to load
if (get.docIdAndVersion() != null) {
Map<String, GetField> fields = null;
byte[] source = null;
UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
ResetFieldSelector fieldSelector = buildFieldSelectors(docMapper, gFields);
if (fieldSelector != null) {
fieldSelector.reset();
Document doc;
try {
doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector);
} catch (IOException e) {
throw new ElasticSearchException("Failed to get type [" + type + "] and id [" + id + "]", e);
}
source = extractSource(doc, docMapper);
for (Object oField : doc.getFields()) {
Fieldable field = (Fieldable) oField;
String name = field.name();
Object value = null;
FieldMappers fieldMappers = docMapper.mappers().indexName(field.name());
if (fieldMappers != null) {
FieldMapper mapper = fieldMappers.mapper();
if (mapper != null) {
name = mapper.names().fullName();
value = mapper.valueForSearch(field);
}
}
if (value == null) {
if (field.isBinary()) {
value = field.getBinaryValue();
} else {
value = field.stringValue();
}
}
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(name);
if (getField == null) {
getField = new GetField(name, new ArrayList<Object>(2));
fields.put(name, getField);
}
getField.values().add(value);
}
}
// now, go and do the script thingy if needed
if (gFields != null && gFields.length > 0) {
SearchLookup searchLookup = null;
for (String field : gFields) {
String script = null;
if (field.contains("_source.") || field.contains("doc[")) {
script = field;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null && !x.mapper().stored()) {
script = "_source." + x.mapper().names().fullName();
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(mapperService, indexCache.fieldData());
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
searchScript.setNextReader(docIdAndVersion.reader);
searchScript.setNextDocId(docIdAndVersion.docId);
try {
Object value = searchScript.run();
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
}
}
}
}
return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields);
} else {
BytesHolder source = get.source();
Map<String, GetField> fields = null;
boolean sourceRequested = false;
// we can only load scripts that can run against the source
if (gFields == null) {
sourceRequested = true;
} else if (gFields.length == 0) {
// no fields, and no source
sourceRequested = false;
} else {
Map<String, Object> sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
SearchLookup searchLookup = null;
for (String field : gFields) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
String script = null;
if (field.contains("_source.")) {
script = field;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null) {
script = "_source." + x.mapper().names().fullName();
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(mapperService, indexCache.fieldData());
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
// we can't do this, only allow to run scripts against the source
//searchScript.setNextReader(docIdAndVersion.reader);
//searchScript.setNextDocId(docIdAndVersion.docId);
// but, we need to inject the parsed source into the script, so it will be used...
searchScript.setNextSource(sourceAsMap);
try {
Object value = searchScript.run();
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
}
}
}
}
return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), sourceRequested ? source : null, fields);
}
} finally {
get.release();
}
}
private static ResetFieldSelector buildFieldSelectors(DocumentMapper docMapper, String... fields) {
if (fields == null) {
return docMapper.sourceMapper().fieldSelector();
}
// don't load anything
if (fields.length == 0) {
return null;
}
FieldMappersFieldSelector fieldSelector = null;
for (String fieldName : fields) {
FieldMappers x = docMapper.mappers().smartName(fieldName);
if (x != null && x.mapper().stored()) {
if (fieldSelector == null) {
fieldSelector = new FieldMappersFieldSelector();
}
fieldSelector.add(x);
}
}
return fieldSelector;
}
private static byte[] extractSource(Document doc, DocumentMapper documentMapper) {
byte[] source = null;
Fieldable sourceField = doc.getFieldable(documentMapper.sourceMapper().names().indexName());
if (sourceField != null) {
source = documentMapper.sourceMapper().nativeValue(sourceField);
doc.removeField(documentMapper.sourceMapper().names().indexName());
}
return source;
}
}

View File

@ -50,6 +50,7 @@ import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayModule;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.get.ShardGetModule;
import org.elasticsearch.index.indexing.ShardIndexingModule;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
@ -281,6 +282,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
modules.add(new IndexShardModule(shardId));
modules.add(new ShardIndexingModule());
modules.add(new ShardGetModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class)));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.ParsedDocument;
@ -45,6 +46,8 @@ public interface IndexShard extends IndexShardComponent {
ShardIndexingService indexingService();
ShardGetService getService();
ShardRouting routingEntry();
DocsStats docStats();

View File

@ -47,6 +47,7 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.OptimizeFailedEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -107,6 +108,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private final ShardIndexingService indexingService;
private final ShardGetService getService;
private final Object mutex = new Object();
@ -131,7 +134,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private final MeanMetric flushMetric = new MeanMetric();
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService) {
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService) {
super(shardId, indexSettings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
@ -145,6 +148,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.indexCache = indexCache;
this.indexAliasesService = indexAliasesService;
this.indexingService = indexingService;
this.getService = getService.setIndexShard(this);
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime("index.refresh_interval", engine.defaultRefreshInterval()));
@ -177,6 +181,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this.indexingService;
}
@Override public ShardGetService getService() {
return this.getService;
}
@Override public ShardRouting routingEntry() {
return this.shardRouting;
}