From 72ee0aaee7baa3f97cc5aeb970131352d70f99f8 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 24 Jun 2011 09:39:37 +0300 Subject: [PATCH] Realtime GET, closes #1060. --- .idea/dictionaries/kimchy.xml | 1 + .../get/SimpleGetActionBenchmark.java | 55 ++++++ .../elasticsearch/action/get/GetRequest.java | 24 +++ .../elasticsearch/action/get/GetResponse.java | 64 +++---- .../action/get/TransportGetAction.java | 120 ++++++++++--- .../client/action/get/GetRequestBuilder.java | 5 + .../org/elasticsearch/common/BytesHolder.java | 114 ++++++++++++ .../common/compress/lzf/LZF.java | 4 + .../common/io/stream/BytesStreamInput.java | 4 + .../common/xcontent/XContentBuilder.java | 5 + .../common/xcontent/XContentFactory.java | 3 +- .../common/xcontent/XContentGenerator.java | 2 + .../xcontent/json/JsonXContentGenerator.java | 14 +- .../smile/SmileXContentGenerator.java | 11 ++ .../elasticsearch/index/engine/Engine.java | 69 ++++++++ .../index/engine/robin/RobinEngine.java | 164 ++++++++++++------ .../index/shard/service/IndexShard.java | 2 +- .../shard/service/InternalIndexShard.java | 24 +-- .../index/translog/Translog.java | 49 +++++- .../index/translog/TranslogStreams.java | 26 +++ .../index/translog/fs/FsTranslog.java | 26 ++- .../index/translog/fs/FsTranslogFile.java | 10 +- .../rest/action/get/RestGetAction.java | 8 +- .../action/support/RestXContentBuilder.java | 10 +- .../script/AbstractSearchScript.java | 6 + .../elasticsearch/script/SearchScript.java | 4 + .../script/mvel/MvelScriptEngineService.java | 4 + .../search/lookup/SourceLookup.java | 39 +++-- .../engine/AbstractSimpleEngineTests.java | 41 ++++- .../integration/document/GetActionTests.java | 108 ++++++++++++ .../groovy/GroovyScriptEngineService.java | 4 + .../JavaScriptScriptEngineService.java | 10 +- .../python/PythonScriptEngineService.java | 4 + 33 files changed, 882 insertions(+), 152 deletions(-) create mode 100644 modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/get/SimpleGetActionBenchmark.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/BytesHolder.java create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/GetActionTests.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index c04da9e1c0e..56220be22aa 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -122,6 +122,7 @@ queryparser rabbitmq rackspace + realtime rebalance rebalancing recycler diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/get/SimpleGetActionBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/get/SimpleGetActionBenchmark.java new file mode 100644 index 00000000000..485a4f67c12 --- /dev/null +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/get/SimpleGetActionBenchmark.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.get; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.unit.SizeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +// simple test for embedded / single remote lookup +public class SimpleGetActionBenchmark { + + public static void main(String[] args) { + long OPERATIONS = SizeValue.parseSizeValue("300k").singles(); + + Node node = NodeBuilder.nodeBuilder().node(); + + Client client; + if (false) { + client = NodeBuilder.nodeBuilder().client(true).node().client(); + } else { + client = node.client(); + } + + client.prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet(); + + StopWatch stopWatch = new StopWatch().start(); + for (long i = 0; i < OPERATIONS; i++) { + client.prepareGet("test", "type1", "1").execute().actionGet(); + } + stopWatch.stop(); + + System.out.println("Ran in " + stopWatch.totalTime() + ", per second: " + (((double) OPERATIONS) / stopWatch.totalTime().secondsFrac())); + + node.close(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java index e838d362219..2221e818c23 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetRequest.java @@ -44,6 +44,8 @@ public class GetRequest extends SingleShardOperationRequest { private boolean refresh = false; + Boolean realtime; + GetRequest() { } @@ -140,6 +142,15 @@ public class GetRequest extends SingleShardOperationRequest { return this.refresh; } + public boolean realtime() { + return this.realtime == null ? true : this.realtime; + } + + public GetRequest realtime(Boolean realtime) { + this.realtime = realtime; + return this; + } + /** * Should the listener be called on a separate thread if needed. */ @@ -166,6 +177,12 @@ public class GetRequest extends SingleShardOperationRequest { fields[i] = in.readUTF(); } } + byte realtime = in.readByte(); + if (realtime == 0) { + this.realtime = false; + } else if (realtime == 1) { + this.realtime = true; + } } @Override public void writeTo(StreamOutput out) throws IOException { @@ -179,6 +196,13 @@ public class GetRequest extends SingleShardOperationRequest { out.writeUTF(field); } } + if (realtime == null) { + out.writeByte((byte) -1); + } else if (realtime == false) { + out.writeByte((byte) 0); + } else { + out.writeByte((byte) 1); + } } @Override public String toString() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java index 1cb6121967f..2aeb88e2074 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.get; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.compress.lzf.LZF; @@ -28,8 +29,11 @@ import org.elasticsearch.common.compress.lzf.LZFDecoder; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.rest.action.support.RestXContentBuilder; +import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; import java.util.Iterator; @@ -62,12 +66,14 @@ public class GetResponse implements ActionResponse, Streamable, Iterable sourceAsMap; - private byte[] source; + private BytesHolder source; + + private byte[] sourceAsBytes; GetResponse() { } - GetResponse(String index, String type, String id, long version, boolean exists, byte[] source, Map fields) { + GetResponse(String index, String type, String id, long version, boolean exists, BytesHolder source, Map fields) { this.index = index; this.type = type; this.id = id; @@ -157,9 +163,21 @@ public class GetResponse implements ActionResponse, Streamable, Iterable getSource() { @@ -258,7 +266,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable 0) { - source = new byte[size]; - in.readFully(source); + if (in.readBoolean()) { + source = BytesHolder.readBytesHolder(in); } - size = in.readVInt(); + int size = in.readVInt(); if (size == 0) { fields = ImmutableMap.of(); } else { @@ -320,10 +326,10 @@ public class GetResponse implements ActionResponse, Streamable, Iterable listener) { + if (request.realtime == null) { + request.realtime = this.realtime; + } + super.doExecute(request, listener); + } + @Override protected GetResponse shardOperation(GetRequest request, int shardId) throws ElasticSearchException { IndexService indexService = indicesService.indexServiceSafe(request.index()); - BloomCache bloomCache = indexService.cache().bloomCache(); IndexShard indexShard = indexService.shardSafe(shardId); DocumentMapper docMapper = indexService.mapperService().documentMapper(request.type()); if (docMapper == null) { - throw new TypeMissingException(new Index(request.index()), request.type()); + return new GetResponse(request.index(), request.type(), request.id(), -1, false, null, null); } - if (request.refresh()) { + if (request.refresh() && !request.realtime()) { indexShard.refresh(new Engine.Refresh(false)); } - Engine.Searcher searcher = indexShard.searcher(); - boolean exists = false; - byte[] source = null; - Map fields = null; - long version = -1; + Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), docMapper.uidMapper().term(request.type(), request.id()))); try { - UidField.DocIdAndVersion docIdAndVersion = loadCurrentVersionFromIndex(bloomCache, searcher, docMapper.uidMapper().term(request.type(), request.id())); - if (docIdAndVersion != null && docIdAndVersion.docId != Lucene.NO_DOC) { - if (docIdAndVersion.version > 0) { - version = docIdAndVersion.version; - } - exists = true; + if (!get.exists()) { + return new GetResponse(request.index(), request.type(), request.id(), -1, false, null, null); + } + + // break between having loaded it from translog (so we only have _source), and having a document to load + if (get.docIdAndVersion() != null) { + Map fields = null; + byte[] source = null; + UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); FieldSelector fieldSelector = buildFieldSelectors(docMapper, request.fields()); if (fieldSelector != null) { - Document doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector); + Document doc; + try { + doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector); + } catch (IOException e) { + throw new ElasticSearchException("Failed to get type [" + request.type() + "] and id [" + request.id() + "]", e); + } source = extractSource(doc, docMapper); for (Object oField : doc.getFields()) { @@ -200,13 +214,75 @@ public class TransportGetAction extends TransportShardSingleOperationAction fields = null; + boolean sourceRequested = false; + + // we can only load scripts that can run against the source + if (request.fields() != null && request.fields().length > 0) { + Map sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length()); + SearchLookup searchLookup = null; + for (String field : request.fields()) { + if (field.equals("_source")) { + sourceRequested = true; + continue; + } + String script = null; + if (field.contains("_source.")) { + script = field; + } else { + FieldMappers x = docMapper.mappers().smartName(field); + if (x != null) { + script = "_source." + x.mapper().names().fullName(); + } + } + if (script != null) { + if (searchLookup == null) { + searchLookup = new SearchLookup(indexService.mapperService(), indexService.cache().fieldData()); + } + SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null); + // we can't do this, only allow to run scripts against the source + //searchScript.setNextReader(docIdAndVersion.reader); + //searchScript.setNextDocId(docIdAndVersion.docId); + + // but, we need to inject the parsed source into the script, so it will be used... + searchScript.setNextSource(sourceAsMap); + + try { + Object value = searchScript.run(); + if (fields == null) { + fields = newHashMapWithExpectedSize(2); + } + GetField getField = fields.get(field); + if (getField == null) { + getField = new GetField(field, new ArrayList(2)); + fields.put(field, getField); + } + getField.values().add(value); + } catch (RuntimeException e) { + if (logger.isTraceEnabled()) { + logger.trace("failed to execute get request script field [{}]", e, script); + } + // ignore + } + } + } + } else { + sourceRequested = true; + } + + return new GetResponse(request.index(), request.type(), request.id(), get.version(), get.exists(), sourceRequested ? source : null, fields); } - } catch (IOException e) { - throw new ElasticSearchException("Failed to get type [" + request.type() + "] and id [" + request.id() + "]", e); } finally { - searcher.release(); + if (get.searcher() != null) { + get.searcher().release(); + } } - return new GetResponse(request.index(), request.type(), request.id(), version, exists, source, fields); } private FieldSelector buildFieldSelectors(DocumentMapper docMapper, String... fields) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/GetRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/GetRequestBuilder.java index 2627da1a23f..7e90744891a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/GetRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/GetRequestBuilder.java @@ -99,6 +99,11 @@ public class GetRequestBuilder extends BaseRequestBuilder= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V; } + public static boolean isCompressed(final byte[] buffer, int offset, int length) { + return length >= 2 && buffer[offset] == LZFChunk.BYTE_Z && buffer[offset + 1] == LZFChunk.BYTE_V; + } + public final static String SUFFIX = ".lzf"; void process(String[] args) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java index 5870f1788b5..7c9b9d8c9eb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java @@ -72,6 +72,10 @@ public class BytesStreamInput extends StreamInput { return len; } + public byte[] underlyingBuffer() { + return buf; + } + @Override public byte readByte() throws IOException { if (pos >= count) { throw new EOFException(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java index 40e1fa190ce..1f45d98cbc2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java @@ -800,6 +800,11 @@ public final class XContentBuilder { return this; } + public XContentBuilder rawField(String fieldName, byte[] content, int offset, int length) throws IOException { + generator.writeRawField(fieldName, content, offset, length, bos); + return this; + } + public XContentBuilder rawField(String fieldName, InputStream content) throws IOException { generator.writeRawField(fieldName, content, bos); return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentFactory.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentFactory.java index 3ad23a1ec53..50497c069a0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentFactory.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentFactory.java @@ -231,7 +231,8 @@ public class XContentFactory { if (length > 2 && data[offset] == SmileConstants.HEADER_BYTE_1 && data[offset + 1] == SmileConstants.HEADER_BYTE_2 && data[offset + 2] == SmileConstants.HEADER_BYTE_3) { return XContentType.SMILE; } - for (int i = offset; i < length; i++) { + int size = offset + length; + for (int i = offset; i < size; i++) { if (data[i] == '{') { return XContentType.JSON; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentGenerator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentGenerator.java index 7a89871027f..b53d41bed13 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentGenerator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContentGenerator.java @@ -107,6 +107,8 @@ public interface XContentGenerator { void writeRawField(String fieldName, byte[] content, OutputStream bos) throws IOException; + void writeRawField(String fieldName, byte[] content, int offset, int length, OutputStream bos) throws IOException; + void writeRawField(String fieldName, InputStream content, OutputStream bos) throws IOException; void copyCurrentStructure(XContentParser parser) throws IOException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java index c7a6640760f..574b285884e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java @@ -21,7 +21,11 @@ package org.elasticsearch.common.xcontent.json; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.jackson.JsonGenerator; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.XContentGenerator; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentString; +import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.io.InputStream; @@ -208,6 +212,14 @@ public class JsonXContentGenerator implements XContentGenerator { bos.write(content); } + @Override public void writeRawField(String fieldName, byte[] content, int offset, int length, OutputStream bos) throws IOException { + generator.writeRaw(", \""); + generator.writeRaw(fieldName); + generator.writeRaw("\" : "); + flush(); + bos.write(content, offset, length); + } + @Override public void writeRawField(String fieldName, InputStream content, OutputStream bos) throws IOException { generator.writeRaw(", \""); generator.writeRaw(fieldName); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContentGenerator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContentGenerator.java index 69bf853a3b3..3cd9e331d27 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContentGenerator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContentGenerator.java @@ -62,4 +62,15 @@ public class SmileXContentGenerator extends JsonXContentGenerator { parser.close(); } } + + @Override public void writeRawField(String fieldName, byte[] content, int offset, int length, OutputStream bos) throws IOException { + writeFieldName(fieldName); + SmileParser parser = SmileXContent.smileFactory.createJsonParser(content, offset, length); + try { + parser.nextToken(); + generator.copyCurrentStructure(parser); + } finally { + parser.close(); + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index 79876532d7b..fbf1581fd96 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -27,6 +27,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.Filter; import org.apache.lucene.search.Query; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.CloseableComponent; import org.elasticsearch.common.lease.Releasable; @@ -76,6 +77,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent { void delete(DeleteByQuery delete) throws EngineException; + GetResult get(Get get) throws EngineException; + Searcher searcher() throws EngineException; /** @@ -585,4 +588,70 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return aliasFilter; } } + + + static class Get { + private final boolean realtime; + private final Term uid; + + public Get(boolean realtime, Term uid) { + this.realtime = realtime; + this.uid = uid; + } + + public boolean realtime() { + return this.realtime; + } + + public Term uid() { + return uid; + } + } + + static class GetResult { + private final boolean exists; + private final long version; + private final BytesHolder source; + private final UidField.DocIdAndVersion docIdAndVersion; + private final Searcher searcher; + + public static final GetResult NOT_EXISTS = new GetResult(false, -1, null); + + public GetResult(boolean exists, long version, BytesHolder source) { + this.source = source; + this.exists = exists; + this.version = version; + this.docIdAndVersion = null; + this.searcher = null; + } + + public GetResult(Searcher searcher, UidField.DocIdAndVersion docIdAndVersion) { + this.exists = true; + this.source = null; + this.version = docIdAndVersion.version; + this.docIdAndVersion = docIdAndVersion; + this.searcher = searcher; + } + + public boolean exists() { + return exists; + } + + public long version() { + return this.version; + } + + public BytesHolder source() { + return source; + } + + public Searcher searcher() { + return this.searcher; + } + + public UidField.DocIdAndVersion docIdAndVersion() { + return docIdAndVersion; + } + } + } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index bae8215095b..48097aa2e6a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -30,6 +30,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.bloom.BloomFilter; @@ -60,6 +61,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -282,6 +284,54 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { return new TimeValue(1, TimeUnit.SECONDS); } + public GetResult get(Get get) throws EngineException { + rwl.readLock().lock(); + try { + if (get.realtime()) { + VersionValue versionValue = versionMap.get(get.uid().text()); + if (versionValue != null) { + if (versionValue.delete()) { + return GetResult.NOT_EXISTS; + } + byte[] data = translog.read(versionValue.translogLocation()); + if (data != null) { + try { + BytesHolder source = TranslogStreams.readSource(data); + return new GetResult(true, versionValue.version(), source); + } catch (IOException e) { + // switched on us, read it from the reader + } + } + } + } + + // no version, get the version from the index, we know that we refresh on flush + Searcher searcher = searcher(); + try { + UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(get.uid().text()); + for (IndexReader reader : searcher.searcher().subReaders()) { + BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter); + // we know that its not there... + if (!filter.isPresent(utf8.result, 0, utf8.length)) { + continue; + } + UidField.DocIdAndVersion docIdAndVersion = UidField.loadDocIdAndVersion(reader, get.uid()); + if (docIdAndVersion != null && docIdAndVersion.docId != Lucene.NO_DOC) { + return new GetResult(searcher, docIdAndVersion); + } + } + } catch (Exception e) { + searcher.release(); + //TODO: A better exception goes here + throw new EngineException(shardId(), "failed to load document", e); + } + + return GetResult.NOT_EXISTS; + } finally { + rwl.readLock().unlock(); + } + } + @Override public void create(Create create) throws EngineException { rwl.readLock().lock(); try { @@ -307,14 +357,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { synchronized (dirtyLock(create.uid())) { UidField uidField = create.uidField(); if (create.origin() == Operation.Origin.RECOVERY) { - // on recovery, we get the actual version we want to use - if (create.version() != 0) { - versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis())); - } uidField.version(create.version()); // we use update doc and not addDoc since we might get duplicates when using transient translog writer.updateDocument(create.uid(), create.doc(), create.analyzer()); - translog.add(new Translog.Create(create)); + Translog.Location translogLocation = translog.add(new Translog.Create(create)); + // on recovery, we get the actual version we want to use + if (create.version() != 0) { + versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis(), translogLocation)); + } } else { long currentVersion; VersionValue versionValue = versionMap.get(create.uid().text()); @@ -381,12 +431,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id()); } - versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis())); uidField.version(updatedVersion); create.version(updatedVersion); writer.addDocument(create.doc(), create.analyzer()); - translog.add(new Translog.Create(create)); + Translog.Location translogLocation = translog.add(new Translog.Create(create)); + + versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation)); } } } @@ -417,13 +468,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { synchronized (dirtyLock(index.uid())) { UidField uidField = index.uidField(); if (index.origin() == Operation.Origin.RECOVERY) { - // on recovery, we get the actual version we want to use - if (index.version() != 0) { - versionMap.put(index.uid().text(), new VersionValue(index.version(), false, threadPool.estimatedTimeInMillis())); - } uidField.version(index.version()); writer.updateDocument(index.uid(), index.doc(), index.analyzer()); - translog.add(new Translog.Index(index)); + Translog.Location translogLocation = translog.add(new Translog.Index(index)); + // on recovery, we get the actual version we want to use + if (index.version() != 0) { + versionMap.put(index.uid().text(), new VersionValue(index.version(), false, threadPool.estimatedTimeInMillis(), translogLocation)); + } } else { long currentVersion; VersionValue versionValue = versionMap.get(index.uid().text()); @@ -479,7 +530,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { updatedVersion = index.version(); } - versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis())); uidField.version(updatedVersion); index.version(updatedVersion); @@ -489,7 +539,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } else { writer.updateDocument(index.uid(), index.doc(), index.analyzer()); } - translog.add(new Translog.Index(index)); + Translog.Location translogLocation = translog.add(new Translog.Index(index)); + + versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation)); } } } @@ -518,13 +570,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private void innerDelete(Delete delete, IndexWriter writer) throws IOException { synchronized (dirtyLock(delete.uid())) { if (delete.origin() == Operation.Origin.RECOVERY) { + writer.deleteDocuments(delete.uid()); + Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); // update the version with the exact version from recovery, assuming we have it if (delete.version() != 0) { - versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true, threadPool.estimatedTimeInMillis())); + versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true, threadPool.estimatedTimeInMillis(), translogLocation)); } - - writer.deleteDocuments(delete.uid()); - translog.add(new Translog.Delete(delete)); } else { long currentVersion; VersionValue versionValue = versionMap.get(delete.uid().text()); @@ -582,10 +633,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // if its a delete on delete and we have the current delete version, return it delete.version(versionValue.version()).notFound(true); } else { - versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis())); delete.version(updatedVersion); writer.deleteDocuments(delete.uid()); - translog.add(new Translog.Delete(delete)); + Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); + versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); } } } @@ -614,6 +665,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } finally { rwl.readLock().unlock(); } + //TODO: This is heavy, since we refresh, but we really have to... + refreshVersioningTable(System.currentTimeMillis()); } @Override public Searcher searcher() throws EngineException { @@ -765,42 +818,43 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { rwl.readLock().unlock(); } } - - - // we need to refresh in order to clear older version values - long time = threadPool.estimatedTimeInMillis(); // mark time here, before we refresh, and then delete all older values - refresh(new Refresh(true).force(true)); - Searcher searcher = indexingSearcher.get(); - if (searcher != null) { - indexingSearcher.set(null); - } - for (Map.Entry entry : versionMap.entrySet()) { - String id = entry.getKey(); - synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? - VersionValue versionValue = versionMap.get(id); - if (versionValue == null) { - continue; - } - if (time - versionValue.time() <= 0) { - continue; // its a newer value, from after/during we refreshed, don't clear it - } - if (versionValue.delete()) { - if ((time - versionValue.time()) > gcDeletesInMillis) { - versionMap.remove(id); - } - } else { - versionMap.remove(id); - } - } - } - if (searcher != null) { - searcher.release(); - } + refreshVersioningTable(threadPool.estimatedTimeInMillis()); } finally { flushing.set(false); } } + private void refreshVersioningTable(long time) { + // we need to refresh in order to clear older version values + refresh(new Refresh(true).force(true)); + Searcher searcher = indexingSearcher.get(); + if (searcher != null) { + indexingSearcher.set(null); + } + for (Map.Entry entry : versionMap.entrySet()) { + String id = entry.getKey(); + synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? + VersionValue versionValue = versionMap.get(id); + if (versionValue == null) { + continue; + } + if (time - versionValue.time() <= 0) { + continue; // its a newer value, from after/during we refreshed, don't clear it + } + if (versionValue.delete()) { + if ((time - versionValue.time()) > gcDeletesInMillis) { + versionMap.remove(id); + } + } else { + versionMap.remove(id); + } + } + } + if (searcher != null) { + searcher.release(); + } + } + @Override public void maybeMerge() throws EngineException { if (!possibleMergeNeeded) { return; @@ -1160,11 +1214,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final long version; private final boolean delete; private final long time; + private final Translog.Location translogLocation; - VersionValue(long version, boolean delete, long time) { + VersionValue(long version, boolean delete, long time, Translog.Location translogLocation) { this.version = version; this.delete = delete; this.time = time; + this.translogLocation = translogLocation; } public long time() { @@ -1178,5 +1234,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { public boolean delete() { return delete; } + + public Translog.Location translogLocation() { + return this.translogLocation; + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index f87f50fc8fb..7ae6a3f2a2f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -61,7 +61,7 @@ public interface IndexShard extends IndexShardComponent { void deleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException; - byte[] get(String type, String id) throws ElasticSearchException; + Engine.GetResult get(Engine.Get get) throws ElasticSearchException; long count(float minScore, byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 0fb9936956b..fe53e6e1f4f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard.service; -import org.apache.lucene.document.Document; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Filter; @@ -344,28 +343,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I engine.delete(new Engine.DeleteByQuery(query, querySource, filteringAliases, aliasFilter, types)); } - @Override public byte[] get(String type, String id) throws ElasticSearchException { + @Override public Engine.GetResult get(Engine.Get get) throws ElasticSearchException { readAllowed(); - DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type); - Engine.Searcher searcher = engine.searcher(); - try { - int docId = Lucene.docId(searcher.reader(), docMapper.uidMapper().term(type, id)); - if (docId == Lucene.NO_DOC) { - if (logger.isTraceEnabled()) { - logger.trace("get for [{}#{}] returned no result", type, id); - } - return null; - } - Document doc = searcher.reader().document(docId, docMapper.sourceMapper().fieldSelector()); - if (logger.isTraceEnabled()) { - logger.trace("get for [{}#{}] returned [{}]", type, id, doc); - } - return docMapper.sourceMapper().value(doc); - } catch (IOException e) { - throw new ElasticSearchException("Failed to get type [" + type + "] and id [" + id + "]", e); - } finally { - searcher.release(); - } + return engine.get(get); } @Override public long count(float minScore, byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 27ea7a821fe..c739af308e7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -20,8 +20,11 @@ package org.elasticsearch.index.translog; import org.apache.lucene.index.Term; +import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -87,7 +90,9 @@ public interface Translog extends IndexShardComponent { /** * Adds a create operation to the transaction log. */ - void add(Operation operation) throws TranslogException; + Location add(Operation operation) throws TranslogException; + + byte[] read(Location location); /** * Snapshots the current transaction log allowing to safely iterate over the snapshot. @@ -120,6 +125,18 @@ public interface Translog extends IndexShardComponent { */ void close(boolean delete); + static class Location { + public final long translogId; + public final long translogLocation; + public final int size; + + public Location(long translogId, long translogLocation, int size) { + this.translogId = translogId; + this.translogLocation = translogLocation; + this.size = size; + } + } + /** * A snapshot of the transaction log, allows to iterate over all the transaction log operations. */ @@ -200,6 +217,8 @@ public interface Translog extends IndexShardComponent { Type opType(); long estimateSize(); + + BytesHolder readSource(BytesStreamInput in) throws IOException; } static class Create implements Operation { @@ -258,6 +277,16 @@ public interface Translog extends IndexShardComponent { return this.version; } + @Override public BytesHolder readSource(BytesStreamInput in) throws IOException { + int version = in.readVInt(); // version + id = in.readUTF(); + type = in.readUTF(); + + int length = in.readVInt(); + int offset = in.position(); + return new BytesHolder(in.underlyingBuffer(), offset, length); + } + @Override public void readFrom(StreamInput in) throws IOException { int version = in.readVInt(); // version id = in.readUTF(); @@ -357,6 +386,16 @@ public interface Translog extends IndexShardComponent { return this.version; } + @Override public BytesHolder readSource(BytesStreamInput in) throws IOException { + int version = in.readVInt(); // version + id = in.readUTF(); + type = in.readUTF(); + + int length = in.readVInt(); + int offset = in.position(); + return new BytesHolder(in.underlyingBuffer(), offset, length); + } + @Override public void readFrom(StreamInput in) throws IOException { int version = in.readVInt(); // version id = in.readUTF(); @@ -432,6 +471,10 @@ public interface Translog extends IndexShardComponent { return this.version; } + @Override public BytesHolder readSource(BytesStreamInput in) throws IOException { + throw new ElasticSearchIllegalStateException("trying to read doc source from delete operation"); + } + @Override public void readFrom(StreamInput in) throws IOException { int version = in.readVInt(); // version uid = new Term(in.readUTF(), in.readUTF()); @@ -486,6 +529,10 @@ public interface Translog extends IndexShardComponent { return this.types; } + @Override public BytesHolder readSource(BytesStreamInput in) throws IOException { + throw new ElasticSearchIllegalStateException("trying to read doc source from delete_by_query operation"); + } + @Override public void readFrom(StreamInput in) throws IOException { int version = in.readVInt(); // version source = new byte[in.readVInt()]; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java index d230300b140..e5093593c4c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogStreams.java @@ -19,6 +19,8 @@ package org.elasticsearch.index.translog; +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -52,6 +54,30 @@ public class TranslogStreams { return operation; } + public static BytesHolder readSource(byte[] data) throws IOException { + BytesStreamInput in = new BytesStreamInput(data); + in.readInt(); // the size header + Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); + Translog.Operation operation; + switch (type) { + case CREATE: + operation = new Translog.Create(); + break; + case DELETE: + operation = new Translog.Delete(); + break; + case DELETE_BY_QUERY: + operation = new Translog.DeleteByQuery(); + break; + case SAVE: + operation = new Translog.Index(); + break; + default: + throw new IOException("No type for [" + type + "]"); + } + return operation.readSource(in); + } + public static void writeTranslogOperation(StreamOutput out, Translog.Operation op) throws IOException { out.writeByte(op.opType().id()); op.writeTo(out); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 1878b995c46..820affef593 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -139,7 +139,26 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog this.trans = null; } - @Override public void add(Operation operation) throws TranslogException { + public byte[] read(Location location) { + FsTranslogFile trans = this.trans; + if (trans != null && trans.id() == location.translogId) { + try { + return trans.read(location); + } catch (Exception e) { + // ignore + } + } + if (current.id() == location.translogId) { + try { + return current.read(location); + } catch (IOException e) { + // ignore + } + } + return null; + } + + @Override public Location add(Operation operation) throws TranslogException { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { BytesStreamOutput out = cachedEntry.cachedBytes(); @@ -151,18 +170,19 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog out.seek(0); out.writeInt(size - 4); - current.add(out.unsafeByteArray(), 0, size); + Location location = current.add(out.unsafeByteArray(), 0, size); if (syncOnEachOperation) { current.sync(); } FsTranslogFile trans = this.trans; if (trans != null) { try { - trans.add(out.unsafeByteArray(), 0, size); + location = trans.add(out.unsafeByteArray(), 0, size); } catch (ClosedChannelException e) { // ignore } } + return location; } catch (Exception e) { throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java index 7b35d61dda5..bfb47ffa8ca 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.translog.fs; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; import java.io.IOException; @@ -59,11 +60,18 @@ public class FsTranslogFile { return lastWrittenPosition.get(); } - public void add(byte[] data, int from, int size) throws IOException { + public Translog.Location add(byte[] data, int from, int size) throws IOException { long position = lastPosition.getAndAdd(size); raf.channel().write(ByteBuffer.wrap(data, from, size), position); lastWrittenPosition.getAndAdd(size); operationCounter.incrementAndGet(); + return new Translog.Location(id, position, size); + } + + public byte[] read(Translog.Location location) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(location.size); + raf.channel().read(buffer, location.translogLocation); + return buffer.array(); } public void close(boolean delete) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java index cd6d94c5563..575c93106e6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java @@ -26,7 +26,12 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.rest.*; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.XContentRestResponse; +import org.elasticsearch.rest.XContentThrowableRestResponse; import java.io.IOException; import java.util.regex.Pattern; @@ -60,6 +65,7 @@ public class RestGetAction extends BaseRestHandler { getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh())); getRequest.routing(request.param("routing")); getRequest.preference(request.param("preference")); + getRequest.realtime(request.paramAsBoolean("realtime", null)); String sField = request.param("fields"); if (sField != null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java index 6b28242e278..21c2763a988 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java @@ -67,8 +67,12 @@ public class RestXContentBuilder { } public static void restDocumentSource(byte[] source, XContentBuilder builder, ToXContent.Params params) throws IOException { - if (LZF.isCompressed(source)) { - BytesStreamInput siBytes = new BytesStreamInput(source); + + } + + public static void restDocumentSource(byte[] source, int offset, int length, XContentBuilder builder, ToXContent.Params params) throws IOException { + if (LZF.isCompressed(source, offset, length)) { + BytesStreamInput siBytes = new BytesStreamInput(source, offset, length); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); XContentType contentType = XContentFactory.xContentType(siLzf); siLzf.resetToBufferStart(); @@ -85,7 +89,7 @@ public class RestXContentBuilder { } } } else { - XContentType contentType = XContentFactory.xContentType(source); + XContentType contentType = XContentFactory.xContentType(source, offset, length); if (contentType == builder.contentType()) { builder.rawField("_source", source); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/script/AbstractSearchScript.java b/modules/elasticsearch/src/main/java/org/elasticsearch/script/AbstractSearchScript.java index 7d6451d1bce..709cc9b8e1a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/script/AbstractSearchScript.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/script/AbstractSearchScript.java @@ -26,6 +26,8 @@ import org.elasticsearch.search.lookup.FieldsLookup; import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.search.lookup.SourceLookup; +import java.util.Map; + /** * A base class for any script type that is used during the search process (custom score, facets, and so on). * @@ -87,6 +89,10 @@ public abstract class AbstractSearchScript extends AbstractExecutableScript impl lookup.setNextDocId(doc); } + @Override public void setNextSource(Map source) { + lookup.source().setNextSource(source); + } + @Override public void setNextScore(float score) { this.score = score; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/script/SearchScript.java b/modules/elasticsearch/src/main/java/org/elasticsearch/script/SearchScript.java index bc35ddbfc05..b06c4a92b19 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/script/SearchScript.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/script/SearchScript.java @@ -22,6 +22,8 @@ package org.elasticsearch.script; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Scorer; +import java.util.Map; + /** * A search script. */ @@ -33,6 +35,8 @@ public interface SearchScript extends ExecutableScript { void setNextDocId(int doc); + void setNextSource(Map source); + void setNextScore(float score); float runAsFloat(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/script/mvel/MvelScriptEngineService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/script/mvel/MvelScriptEngineService.java index dc6cbadff42..3e66c24bfab 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/script/mvel/MvelScriptEngineService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/script/mvel/MvelScriptEngineService.java @@ -165,6 +165,10 @@ public class MvelScriptEngineService extends AbstractComponent implements Script resolver.createVariable(name, value); } + @Override public void setNextSource(Map source) { + lookup.source().setNextSource(source); + } + @Override public Object run() { return script.getValue(null, resolver); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java index b0d66d58cc4..03b269e17ce 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java @@ -65,17 +65,7 @@ public class SourceLookup implements Map { Document doc = reader.document(docId, SourceFieldSelector.INSTANCE); Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME); byte[] source = sourceField.getBinaryValue(); - if (LZF.isCompressed(source)) { - BytesStreamInput siBytes = new BytesStreamInput(source); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - XContentType contentType = XContentFactory.xContentType(siLzf); - siLzf.resetToBufferStart(); - parser = XContentFactory.xContent(contentType).createParser(siLzf); - this.source = parser.map(); - } else { - parser = XContentFactory.xContent(source).createParser(source); - this.source = parser.map(); - } + this.source = sourceAsMap(source, 0, source.length); } catch (Exception e) { throw new ElasticSearchParseException("failed to parse / load source", e); } finally { @@ -86,6 +76,29 @@ public class SourceLookup implements Map { return this.source; } + public static Map sourceAsMap(byte[] bytes, int offset, int length) { + XContentParser parser = null; + try { + if (LZF.isCompressed(bytes, offset, length)) { + BytesStreamInput siBytes = new BytesStreamInput(bytes, offset, length); + LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); + XContentType contentType = XContentFactory.xContentType(siLzf); + siLzf.resetToBufferStart(); + parser = XContentFactory.xContent(contentType).createParser(siLzf); + return parser.map(); + } else { + parser = XContentFactory.xContent(bytes, offset, length).createParser(bytes, offset, length); + return parser.map(); + } + } catch (Exception e) { + throw new ElasticSearchParseException("Failed to parse source to map", e); + } finally { + if (parser != null) { + parser.close(); + } + } + } + public void setNextReader(IndexReader reader) { if (this.reader == reader) { // if we are called with the same reader, don't invalidate source return; @@ -103,6 +116,10 @@ public class SourceLookup implements Map { this.source = null; } + public void setNextSource(Map source) { + this.source = source; + } + private final static Pattern dotPattern = Pattern.compile("\\."); /** diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java index 140d7d1d38f..d3b7944dde5 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java @@ -19,9 +19,11 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; @@ -29,6 +31,7 @@ import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; @@ -136,7 +139,7 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // create a document - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); // its not there... @@ -145,6 +148,16 @@ public abstract class AbstractSimpleEngineTests { assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)); searchResult.release(); + // but, we can still get it (in realtime) + Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.source(), equalTo(new BytesHolder(B_1))); + assertThat(getResult.docIdAndVersion(), nullValue()); + + // but, not there non realtime + getResult = engine.get(new Engine.Get(false, newUid("1"))); + assertThat(getResult.exists(), equalTo(false)); + // refresh and it should be there engine.refresh(new Engine.Refresh(true)); @@ -154,8 +167,13 @@ public abstract class AbstractSimpleEngineTests { assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1)); searchResult.release(); + // also in non realtime + getResult = engine.get(new Engine.Get(false, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + // now do an update - doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false); engine.index(new Engine.Index(null, newUid("1"), doc)); // its not updated yet... @@ -165,6 +183,12 @@ public abstract class AbstractSimpleEngineTests { assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0)); searchResult.release(); + // but, we can still get it (in realtime) + getResult = engine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.source(), equalTo(new BytesHolder(B_2))); + assertThat(getResult.docIdAndVersion(), nullValue()); + // refresh and it should be updated engine.refresh(new Engine.Refresh(true)); @@ -184,6 +208,10 @@ public abstract class AbstractSimpleEngineTests { assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1)); searchResult.release(); + // but, get should not see it (in realtime) + getResult = engine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(false)); + // refresh and it should be deleted engine.refresh(new Engine.Refresh(true)); @@ -194,7 +222,7 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // add it back - doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); // its not there... @@ -217,6 +245,13 @@ public abstract class AbstractSimpleEngineTests { // now flush engine.flush(new Engine.Flush()); + // and, verify get (in real time) + getResult = engine.get(new Engine.Get(true, newUid("1"))); + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.source(), nullValue()); + assertThat(getResult.docIdAndVersion(), notNullValue()); + + // make sure we can still work with the engine // now do an update doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/GetActionTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/GetActionTests.java new file mode 100644 index 00000000000..1f3915746ee --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/GetActionTests.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.document; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.elasticsearch.client.Requests.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +public class GetActionTests extends AbstractNodesTests { + + protected Client client; + + @BeforeClass public void startNodes() { + startNode("node1"); + startNode("node2"); + client = client("node1"); + } + + @AfterClass public void closeNodes() { + client.close(); + closeAllNodes(); + } + + @Test public void simpleGetTests() { + try { + client.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception e) { + // fine + } + client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1)).execute().actionGet(); + + ClusterHealthResponse clusterHealth = client.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + GetResponse response = client.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(response.exists(), equalTo(false)); + + logger.info("--> index doc 1"); + client.prepareIndex("test", "type1", "1").setSource("field1", "value1", "field2", "value2").execute().actionGet(); + + logger.info("--> realtime get 1"); + response = client.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(response.exists(), equalTo(true)); + assertThat(response.sourceAsMap().get("field1").toString(), equalTo("value1")); + assertThat(response.sourceAsMap().get("field2").toString(), equalTo("value2")); + + logger.info("--> non realtime get 1"); + response = client.prepareGet("test", "type1", "1").setRealtime(false).execute().actionGet(); + assertThat(response.exists(), equalTo(false)); + + logger.info("--> realtime fetch of field (requires fetching parsing source)"); + response = client.prepareGet("test", "type1", "1").setFields("field1").execute().actionGet(); + assertThat(response.exists(), equalTo(true)); + assertThat(response.source(), nullValue()); + assertThat(response.field("field1").values().get(0).toString(), equalTo("value1")); + assertThat(response.field("field2"), nullValue()); + + logger.info("--> flush the index, so we load it from it"); + client.admin().indices().prepareFlush().execute().actionGet(); + + logger.info("--> realtime get 1 (loaded from index)"); + response = client.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(response.exists(), equalTo(true)); + assertThat(response.sourceAsMap().get("field1").toString(), equalTo("value1")); + assertThat(response.sourceAsMap().get("field2").toString(), equalTo("value2")); + + logger.info("--> non realtime get 1 (loaded from index)"); + response = client.prepareGet("test", "type1", "1").setRealtime(false).execute().actionGet(); + assertThat(response.exists(), equalTo(true)); + assertThat(response.sourceAsMap().get("field1").toString(), equalTo("value1")); + assertThat(response.sourceAsMap().get("field2").toString(), equalTo("value2")); + + logger.info("--> realtime fetch of field (loaded from index)"); + response = client.prepareGet("test", "type1", "1").setFields("field1").execute().actionGet(); + assertThat(response.exists(), equalTo(true)); + assertThat(response.source(), nullValue()); + assertThat(response.field("field1").values().get(0).toString(), equalTo("value1")); + assertThat(response.field("field2"), nullValue()); + } +} \ No newline at end of file diff --git a/plugins/lang/groovy/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java b/plugins/lang/groovy/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java index b04d35e6d57..f34af7acc58 100644 --- a/plugins/lang/groovy/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java +++ b/plugins/lang/groovy/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java @@ -171,6 +171,10 @@ public class GroovyScriptEngineService extends AbstractComponent implements Scri script.getBinding().getVariables().put(name, value); } + @Override public void setNextSource(Map source) { + lookup.source().setNextSource(source); + } + @Override public Object run() { return script.run(); } diff --git a/plugins/lang/javascript/src/main/java/org/elasticsearch/script/javascript/JavaScriptScriptEngineService.java b/plugins/lang/javascript/src/main/java/org/elasticsearch/script/javascript/JavaScriptScriptEngineService.java index d9a6d71a01f..a85a21e1cba 100644 --- a/plugins/lang/javascript/src/main/java/org/elasticsearch/script/javascript/JavaScriptScriptEngineService.java +++ b/plugins/lang/javascript/src/main/java/org/elasticsearch/script/javascript/JavaScriptScriptEngineService.java @@ -32,7 +32,11 @@ import org.elasticsearch.script.javascript.support.NativeList; import org.elasticsearch.script.javascript.support.NativeMap; import org.elasticsearch.script.javascript.support.ScriptValueConverter; import org.elasticsearch.search.lookup.SearchLookup; -import org.mozilla.javascript.*; +import org.mozilla.javascript.Context; +import org.mozilla.javascript.Script; +import org.mozilla.javascript.Scriptable; +import org.mozilla.javascript.ScriptableObject; +import org.mozilla.javascript.WrapFactory; import java.util.List; import java.util.Map; @@ -222,6 +226,10 @@ public class JavaScriptScriptEngineService extends AbstractComponent implements ScriptableObject.putProperty(scope, name, value); } + @Override public void setNextSource(Map source) { + lookup.source().setNextSource(source); + } + @Override public Object run() { Context ctx = Context.enter(); try { diff --git a/plugins/lang/python/src/main/java/org/elasticsearch/script/python/PythonScriptEngineService.java b/plugins/lang/python/src/main/java/org/elasticsearch/script/python/PythonScriptEngineService.java index 2400c756228..1f265e6dbdf 100644 --- a/plugins/lang/python/src/main/java/org/elasticsearch/script/python/PythonScriptEngineService.java +++ b/plugins/lang/python/src/main/java/org/elasticsearch/script/python/PythonScriptEngineService.java @@ -155,6 +155,10 @@ public class PythonScriptEngineService extends AbstractComponent implements Scri lookup.setNextDocId(doc); } + @Override public void setNextSource(Map source) { + lookup.source().setNextSource(source); + } + @Override public void setNextScore(float score) { pyVars.__setitem__("_score", Py.java2py(score)); }