Realtime GET, closes #1060.

This commit is contained in:
kimchy 2011-06-24 09:39:37 +03:00
parent 4547bc3ef4
commit 72ee0aaee7
33 changed files with 882 additions and 152 deletions

View File

@ -122,6 +122,7 @@
<w>queryparser</w>
<w>rabbitmq</w>
<w>rackspace</w>
<w>realtime</w>
<w>rebalance</w>
<w>rebalancing</w>
<w>recycler</w>

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.get;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
// simple test for embedded / single remote lookup
public class SimpleGetActionBenchmark {
public static void main(String[] args) {
long OPERATIONS = SizeValue.parseSizeValue("300k").singles();
Node node = NodeBuilder.nodeBuilder().node();
Client client;
if (false) {
client = NodeBuilder.nodeBuilder().client(true).node().client();
} else {
client = node.client();
}
client.prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();
StopWatch stopWatch = new StopWatch().start();
for (long i = 0; i < OPERATIONS; i++) {
client.prepareGet("test", "type1", "1").execute().actionGet();
}
stopWatch.stop();
System.out.println("Ran in " + stopWatch.totalTime() + ", per second: " + (((double) OPERATIONS) / stopWatch.totalTime().secondsFrac()));
node.close();
}
}

View File

@ -44,6 +44,8 @@ public class GetRequest extends SingleShardOperationRequest {
private boolean refresh = false;
Boolean realtime;
GetRequest() {
}
@ -140,6 +142,15 @@ public class GetRequest extends SingleShardOperationRequest {
return this.refresh;
}
public boolean realtime() {
return this.realtime == null ? true : this.realtime;
}
public GetRequest realtime(Boolean realtime) {
this.realtime = realtime;
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -166,6 +177,12 @@ public class GetRequest extends SingleShardOperationRequest {
fields[i] = in.readUTF();
}
}
byte realtime = in.readByte();
if (realtime == 0) {
this.realtime = false;
} else if (realtime == 1) {
this.realtime = true;
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -179,6 +196,13 @@ public class GetRequest extends SingleShardOperationRequest {
out.writeUTF(field);
}
}
if (realtime == null) {
out.writeByte((byte) -1);
} else if (realtime == false) {
out.writeByte((byte) 0);
} else {
out.writeByte((byte) 1);
}
}
@Override public String toString() {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.get;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZF;
@ -28,8 +29,11 @@ import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.Iterator;
@ -62,12 +66,14 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
private Map<String, Object> sourceAsMap;
private byte[] source;
private BytesHolder source;
private byte[] sourceAsBytes;
GetResponse() {
}
GetResponse(String index, String type, String id, long version, boolean exists, byte[] source, Map<String, GetField> fields) {
GetResponse(String index, String type, String id, long version, boolean exists, BytesHolder source, Map<String, GetField> fields) {
this.index = index;
this.type = type;
this.id = id;
@ -157,9 +163,21 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
if (source == null) {
return null;
}
if (LZF.isCompressed(source)) {
if (sourceAsBytes != null) {
return sourceAsBytes;
}
this.sourceAsBytes = sourceRef().copyBytes();
return this.sourceAsBytes;
}
/**
* Returns bytes reference, also un compress the source if needed.
*/
public BytesHolder sourceRef() {
if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) {
try {
this.source = LZFDecoder.decode(source);
// TODO decompress without doing an extra copy!
this.source = new BytesHolder(LZFDecoder.decode(source.copyBytes()));
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
@ -181,7 +199,8 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
if (source == null) {
return null;
}
return Unicode.fromBytes(source());
BytesHolder source = sourceRef();
return Unicode.fromBytes(source.bytes(), source.offset(), source.length());
}
/**
@ -195,20 +214,9 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
if (sourceAsMap != null) {
return sourceAsMap;
}
byte[] source = source();
XContentParser parser = null;
try {
parser = XContentFactory.xContent(source).createParser(source);
sourceAsMap = parser.map();
parser.close();
return sourceAsMap;
} catch (Exception e) {
throw new ElasticSearchParseException("Failed to parse source to map", e);
} finally {
if (parser != null) {
parser.close();
}
}
sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
return sourceAsMap;
}
public Map<String, Object> getSource() {
@ -258,7 +266,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
builder.field(Fields._VERSION, version);
}
if (source != null) {
RestXContentBuilder.restDocumentSource(source, builder, params);
RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params);
}
if (fields != null && !fields.isEmpty()) {
@ -294,12 +302,10 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
version = in.readLong();
exists = in.readBoolean();
if (exists) {
int size = in.readVInt();
if (size > 0) {
source = new byte[size];
in.readFully(source);
if (in.readBoolean()) {
source = BytesHolder.readBytesHolder(in);
}
size = in.readVInt();
int size = in.readVInt();
if (size == 0) {
fields = ImmutableMap.of();
} else {
@ -320,10 +326,10 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
out.writeBoolean(exists);
if (exists) {
if (source == null) {
out.writeVInt(0);
out.writeBoolean(false);
} else {
out.writeVInt(source.length);
out.writeBytes(source);
out.writeBoolean(true);
source.writeTo(out);
}
if (fields == null) {
out.writeVInt(0);

View File

@ -26,18 +26,18 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.bloom.BloomFilter;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.bloom.BloomCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -48,10 +48,10 @@ import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -72,11 +72,15 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
private final ScriptService scriptService;
private final boolean realtime;
@Inject public TransportGetAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ScriptService scriptService, ThreadPool threadPool) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.realtime = settings.getAsBoolean("action.get.realtime", true);
}
@Override protected String executor() {
@ -95,35 +99,45 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, request.index());
}
@Override protected void doExecute(GetRequest request, ActionListener<GetResponse> listener) {
if (request.realtime == null) {
request.realtime = this.realtime;
}
super.doExecute(request, listener);
}
@Override protected GetResponse shardOperation(GetRequest request, int shardId) throws ElasticSearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
BloomCache bloomCache = indexService.cache().bloomCache();
IndexShard indexShard = indexService.shardSafe(shardId);
DocumentMapper docMapper = indexService.mapperService().documentMapper(request.type());
if (docMapper == null) {
throw new TypeMissingException(new Index(request.index()), request.type());
return new GetResponse(request.index(), request.type(), request.id(), -1, false, null, null);
}
if (request.refresh()) {
if (request.refresh() && !request.realtime()) {
indexShard.refresh(new Engine.Refresh(false));
}
Engine.Searcher searcher = indexShard.searcher();
boolean exists = false;
byte[] source = null;
Map<String, GetField> fields = null;
long version = -1;
Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), docMapper.uidMapper().term(request.type(), request.id())));
try {
UidField.DocIdAndVersion docIdAndVersion = loadCurrentVersionFromIndex(bloomCache, searcher, docMapper.uidMapper().term(request.type(), request.id()));
if (docIdAndVersion != null && docIdAndVersion.docId != Lucene.NO_DOC) {
if (docIdAndVersion.version > 0) {
version = docIdAndVersion.version;
}
exists = true;
if (!get.exists()) {
return new GetResponse(request.index(), request.type(), request.id(), -1, false, null, null);
}
// break between having loaded it from translog (so we only have _source), and having a document to load
if (get.docIdAndVersion() != null) {
Map<String, GetField> fields = null;
byte[] source = null;
UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
FieldSelector fieldSelector = buildFieldSelectors(docMapper, request.fields());
if (fieldSelector != null) {
Document doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector);
Document doc;
try {
doc = docIdAndVersion.reader.document(docIdAndVersion.docId, fieldSelector);
} catch (IOException e) {
throw new ElasticSearchException("Failed to get type [" + request.type() + "] and id [" + request.id() + "]", e);
}
source = extractSource(doc, docMapper);
for (Object oField : doc.getFields()) {
@ -200,13 +214,75 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
}
}
}
return new GetResponse(request.index(), request.type(), request.id(), get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields);
} else {
BytesHolder source = get.source();
assert source != null;
Map<String, GetField> fields = null;
boolean sourceRequested = false;
// we can only load scripts that can run against the source
if (request.fields() != null && request.fields().length > 0) {
Map<String, Object> sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
SearchLookup searchLookup = null;
for (String field : request.fields()) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
String script = null;
if (field.contains("_source.")) {
script = field;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null) {
script = "_source." + x.mapper().names().fullName();
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(indexService.mapperService(), indexService.cache().fieldData());
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
// we can't do this, only allow to run scripts against the source
//searchScript.setNextReader(docIdAndVersion.reader);
//searchScript.setNextDocId(docIdAndVersion.docId);
// but, we need to inject the parsed source into the script, so it will be used...
searchScript.setNextSource(sourceAsMap);
try {
Object value = searchScript.run();
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
}
}
}
} else {
sourceRequested = true;
}
return new GetResponse(request.index(), request.type(), request.id(), get.version(), get.exists(), sourceRequested ? source : null, fields);
}
} catch (IOException e) {
throw new ElasticSearchException("Failed to get type [" + request.type() + "] and id [" + request.id() + "]", e);
} finally {
searcher.release();
if (get.searcher() != null) {
get.searcher().release();
}
}
return new GetResponse(request.index(), request.type(), request.id(), version, exists, source, fields);
}
private FieldSelector buildFieldSelectors(DocumentMapper docMapper, String... fields) {

View File

@ -99,6 +99,11 @@ public class GetRequestBuilder extends BaseRequestBuilder<GetRequest, GetRespons
return this;
}
public GetRequestBuilder setRealtime(Boolean realtime) {
request.realtime(realtime);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
import java.util.Arrays;
public class BytesHolder implements Streamable {
private byte[] bytes;
private int offset;
private int length;
BytesHolder() {
}
public BytesHolder(byte[] bytes) {
this.bytes = bytes;
this.offset = 0;
this.length = bytes.length;
}
public BytesHolder(byte[] bytes, int offset, int length) {
this.bytes = bytes;
this.offset = offset;
this.length = length;
}
public byte[] copyBytes() {
return Arrays.copyOfRange(bytes, offset, offset + length);
}
public byte[] bytes() {
return bytes;
}
public int offset() {
return offset;
}
public int length() {
return length;
}
public static BytesHolder readBytesHolder(StreamInput in) throws IOException {
BytesHolder holder = new BytesHolder();
holder.readFrom(in);
return holder;
}
@Override public void readFrom(StreamInput in) throws IOException {
offset = 0;
length = in.readVInt();
bytes = new byte[length];
in.readFully(bytes);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(length);
out.writeBytes(bytes, offset, length);
}
@Override public boolean equals(Object obj) {
return bytesEquals((BytesHolder) obj);
}
public boolean bytesEquals(BytesHolder other) {
if (length == other.length) {
int otherUpto = other.offset;
final byte[] otherBytes = other.bytes;
final int end = offset + length;
for (int upto = offset; upto < end; upto++, otherUpto++) {
if (bytes[upto] != otherBytes[otherUpto]) {
return false;
}
}
return true;
} else {
return false;
}
}
@Override
public int hashCode() {
int result = 0;
final int end = offset + length;
for (int i = offset; i < end; i++) {
result = 31 * result + bytes[i];
}
return result;
}
}

View File

@ -29,6 +29,10 @@ public class LZF {
return buffer.length >= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V;
}
public static boolean isCompressed(final byte[] buffer, int offset, int length) {
return length >= 2 && buffer[offset] == LZFChunk.BYTE_Z && buffer[offset + 1] == LZFChunk.BYTE_V;
}
public final static String SUFFIX = ".lzf";
void process(String[] args) throws IOException {

View File

@ -72,6 +72,10 @@ public class BytesStreamInput extends StreamInput {
return len;
}
public byte[] underlyingBuffer() {
return buf;
}
@Override public byte readByte() throws IOException {
if (pos >= count) {
throw new EOFException();

View File

@ -800,6 +800,11 @@ public final class XContentBuilder {
return this;
}
public XContentBuilder rawField(String fieldName, byte[] content, int offset, int length) throws IOException {
generator.writeRawField(fieldName, content, offset, length, bos);
return this;
}
public XContentBuilder rawField(String fieldName, InputStream content) throws IOException {
generator.writeRawField(fieldName, content, bos);
return this;

View File

@ -231,7 +231,8 @@ public class XContentFactory {
if (length > 2 && data[offset] == SmileConstants.HEADER_BYTE_1 && data[offset + 1] == SmileConstants.HEADER_BYTE_2 && data[offset + 2] == SmileConstants.HEADER_BYTE_3) {
return XContentType.SMILE;
}
for (int i = offset; i < length; i++) {
int size = offset + length;
for (int i = offset; i < size; i++) {
if (data[i] == '{') {
return XContentType.JSON;
}

View File

@ -107,6 +107,8 @@ public interface XContentGenerator {
void writeRawField(String fieldName, byte[] content, OutputStream bos) throws IOException;
void writeRawField(String fieldName, byte[] content, int offset, int length, OutputStream bos) throws IOException;
void writeRawField(String fieldName, InputStream content, OutputStream bos) throws IOException;
void copyCurrentStructure(XContentParser parser) throws IOException;

View File

@ -21,7 +21,11 @@ package org.elasticsearch.common.xcontent.json;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.jackson.JsonGenerator;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentString;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.io.InputStream;
@ -208,6 +212,14 @@ public class JsonXContentGenerator implements XContentGenerator {
bos.write(content);
}
@Override public void writeRawField(String fieldName, byte[] content, int offset, int length, OutputStream bos) throws IOException {
generator.writeRaw(", \"");
generator.writeRaw(fieldName);
generator.writeRaw("\" : ");
flush();
bos.write(content, offset, length);
}
@Override public void writeRawField(String fieldName, InputStream content, OutputStream bos) throws IOException {
generator.writeRaw(", \"");
generator.writeRaw(fieldName);

View File

@ -62,4 +62,15 @@ public class SmileXContentGenerator extends JsonXContentGenerator {
parser.close();
}
}
@Override public void writeRawField(String fieldName, byte[] content, int offset, int length, OutputStream bos) throws IOException {
writeFieldName(fieldName);
SmileParser parser = SmileXContent.smileFactory.createJsonParser(content, offset, length);
try {
parser.nextToken();
generator.copyCurrentStructure(parser);
} finally {
parser.close();
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.lease.Releasable;
@ -76,6 +77,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
void delete(DeleteByQuery delete) throws EngineException;
GetResult get(Get get) throws EngineException;
Searcher searcher() throws EngineException;
/**
@ -585,4 +588,70 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return aliasFilter;
}
}
static class Get {
private final boolean realtime;
private final Term uid;
public Get(boolean realtime, Term uid) {
this.realtime = realtime;
this.uid = uid;
}
public boolean realtime() {
return this.realtime;
}
public Term uid() {
return uid;
}
}
static class GetResult {
private final boolean exists;
private final long version;
private final BytesHolder source;
private final UidField.DocIdAndVersion docIdAndVersion;
private final Searcher searcher;
public static final GetResult NOT_EXISTS = new GetResult(false, -1, null);
public GetResult(boolean exists, long version, BytesHolder source) {
this.source = source;
this.exists = exists;
this.version = version;
this.docIdAndVersion = null;
this.searcher = null;
}
public GetResult(Searcher searcher, UidField.DocIdAndVersion docIdAndVersion) {
this.exists = true;
this.source = null;
this.version = docIdAndVersion.version;
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
}
public boolean exists() {
return exists;
}
public long version() {
return this.version;
}
public BytesHolder source() {
return source;
}
public Searcher searcher() {
return this.searcher;
}
public UidField.DocIdAndVersion docIdAndVersion() {
return docIdAndVersion;
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.bloom.BloomFilter;
@ -60,6 +61,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -282,6 +284,54 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
return new TimeValue(1, TimeUnit.SECONDS);
}
public GetResult get(Get get) throws EngineException {
rwl.readLock().lock();
try {
if (get.realtime()) {
VersionValue versionValue = versionMap.get(get.uid().text());
if (versionValue != null) {
if (versionValue.delete()) {
return GetResult.NOT_EXISTS;
}
byte[] data = translog.read(versionValue.translogLocation());
if (data != null) {
try {
BytesHolder source = TranslogStreams.readSource(data);
return new GetResult(true, versionValue.version(), source);
} catch (IOException e) {
// switched on us, read it from the reader
}
}
}
}
// no version, get the version from the index, we know that we refresh on flush
Searcher searcher = searcher();
try {
UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(get.uid().text());
for (IndexReader reader : searcher.searcher().subReaders()) {
BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter);
// we know that its not there...
if (!filter.isPresent(utf8.result, 0, utf8.length)) {
continue;
}
UidField.DocIdAndVersion docIdAndVersion = UidField.loadDocIdAndVersion(reader, get.uid());
if (docIdAndVersion != null && docIdAndVersion.docId != Lucene.NO_DOC) {
return new GetResult(searcher, docIdAndVersion);
}
}
} catch (Exception e) {
searcher.release();
//TODO: A better exception goes here
throw new EngineException(shardId(), "failed to load document", e);
}
return GetResult.NOT_EXISTS;
} finally {
rwl.readLock().unlock();
}
}
@Override public void create(Create create) throws EngineException {
rwl.readLock().lock();
try {
@ -307,14 +357,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
synchronized (dirtyLock(create.uid())) {
UidField uidField = create.uidField();
if (create.origin() == Operation.Origin.RECOVERY) {
// on recovery, we get the actual version we want to use
if (create.version() != 0) {
versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis()));
}
uidField.version(create.version());
// we use update doc and not addDoc since we might get duplicates when using transient translog
writer.updateDocument(create.uid(), create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
Translog.Location translogLocation = translog.add(new Translog.Create(create));
// on recovery, we get the actual version we want to use
if (create.version() != 0) {
versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis(), translogLocation));
}
} else {
long currentVersion;
VersionValue versionValue = versionMap.get(create.uid().text());
@ -381,12 +431,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id());
}
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis()));
uidField.version(updatedVersion);
create.version(updatedVersion);
writer.addDocument(create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
Translog.Location translogLocation = translog.add(new Translog.Create(create));
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
}
}
}
@ -417,13 +468,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
synchronized (dirtyLock(index.uid())) {
UidField uidField = index.uidField();
if (index.origin() == Operation.Origin.RECOVERY) {
// on recovery, we get the actual version we want to use
if (index.version() != 0) {
versionMap.put(index.uid().text(), new VersionValue(index.version(), false, threadPool.estimatedTimeInMillis()));
}
uidField.version(index.version());
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
translog.add(new Translog.Index(index));
Translog.Location translogLocation = translog.add(new Translog.Index(index));
// on recovery, we get the actual version we want to use
if (index.version() != 0) {
versionMap.put(index.uid().text(), new VersionValue(index.version(), false, threadPool.estimatedTimeInMillis(), translogLocation));
}
} else {
long currentVersion;
VersionValue versionValue = versionMap.get(index.uid().text());
@ -479,7 +530,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
updatedVersion = index.version();
}
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis()));
uidField.version(updatedVersion);
index.version(updatedVersion);
@ -489,7 +539,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} else {
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
}
translog.add(new Translog.Index(index));
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
}
}
}
@ -518,13 +570,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
synchronized (dirtyLock(delete.uid())) {
if (delete.origin() == Operation.Origin.RECOVERY) {
writer.deleteDocuments(delete.uid());
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
// update the version with the exact version from recovery, assuming we have it
if (delete.version() != 0) {
versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true, threadPool.estimatedTimeInMillis()));
versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true, threadPool.estimatedTimeInMillis(), translogLocation));
}
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
} else {
long currentVersion;
VersionValue versionValue = versionMap.get(delete.uid().text());
@ -582,10 +633,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// if its a delete on delete and we have the current delete version, return it
delete.version(versionValue.version()).notFound(true);
} else {
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis()));
delete.version(updatedVersion);
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
}
}
}
@ -614,6 +665,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} finally {
rwl.readLock().unlock();
}
//TODO: This is heavy, since we refresh, but we really have to...
refreshVersioningTable(System.currentTimeMillis());
}
@Override public Searcher searcher() throws EngineException {
@ -765,42 +818,43 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
rwl.readLock().unlock();
}
}
// we need to refresh in order to clear older version values
long time = threadPool.estimatedTimeInMillis(); // mark time here, before we refresh, and then delete all older values
refresh(new Refresh(true).force(true));
Searcher searcher = indexingSearcher.get();
if (searcher != null) {
indexingSearcher.set(null);
}
for (Map.Entry<String, VersionValue> entry : versionMap.entrySet()) {
String id = entry.getKey();
synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
VersionValue versionValue = versionMap.get(id);
if (versionValue == null) {
continue;
}
if (time - versionValue.time() <= 0) {
continue; // its a newer value, from after/during we refreshed, don't clear it
}
if (versionValue.delete()) {
if ((time - versionValue.time()) > gcDeletesInMillis) {
versionMap.remove(id);
}
} else {
versionMap.remove(id);
}
}
}
if (searcher != null) {
searcher.release();
}
refreshVersioningTable(threadPool.estimatedTimeInMillis());
} finally {
flushing.set(false);
}
}
private void refreshVersioningTable(long time) {
// we need to refresh in order to clear older version values
refresh(new Refresh(true).force(true));
Searcher searcher = indexingSearcher.get();
if (searcher != null) {
indexingSearcher.set(null);
}
for (Map.Entry<String, VersionValue> entry : versionMap.entrySet()) {
String id = entry.getKey();
synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
VersionValue versionValue = versionMap.get(id);
if (versionValue == null) {
continue;
}
if (time - versionValue.time() <= 0) {
continue; // its a newer value, from after/during we refreshed, don't clear it
}
if (versionValue.delete()) {
if ((time - versionValue.time()) > gcDeletesInMillis) {
versionMap.remove(id);
}
} else {
versionMap.remove(id);
}
}
}
if (searcher != null) {
searcher.release();
}
}
@Override public void maybeMerge() throws EngineException {
if (!possibleMergeNeeded) {
return;
@ -1160,11 +1214,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final long version;
private final boolean delete;
private final long time;
private final Translog.Location translogLocation;
VersionValue(long version, boolean delete, long time) {
VersionValue(long version, boolean delete, long time, Translog.Location translogLocation) {
this.version = version;
this.delete = delete;
this.time = time;
this.translogLocation = translogLocation;
}
public long time() {
@ -1178,5 +1234,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
public boolean delete() {
return delete;
}
public Translog.Location translogLocation() {
return this.translogLocation;
}
}
}

View File

@ -61,7 +61,7 @@ public interface IndexShard extends IndexShardComponent {
void deleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
byte[] get(String type, String id) throws ElasticSearchException;
Engine.GetResult get(Engine.Get get) throws ElasticSearchException;
long count(float minScore, byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.shard.service;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Filter;
@ -344,28 +343,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.delete(new Engine.DeleteByQuery(query, querySource, filteringAliases, aliasFilter, types));
}
@Override public byte[] get(String type, String id) throws ElasticSearchException {
@Override public Engine.GetResult get(Engine.Get get) throws ElasticSearchException {
readAllowed();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
Engine.Searcher searcher = engine.searcher();
try {
int docId = Lucene.docId(searcher.reader(), docMapper.uidMapper().term(type, id));
if (docId == Lucene.NO_DOC) {
if (logger.isTraceEnabled()) {
logger.trace("get for [{}#{}] returned no result", type, id);
}
return null;
}
Document doc = searcher.reader().document(docId, docMapper.sourceMapper().fieldSelector());
if (logger.isTraceEnabled()) {
logger.trace("get for [{}#{}] returned [{}]", type, id, doc);
}
return docMapper.sourceMapper().value(doc);
} catch (IOException e) {
throw new ElasticSearchException("Failed to get type [" + type + "] and id [" + id + "]", e);
} finally {
searcher.release();
}
return engine.get(get);
}
@Override public long count(float minScore, byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {

View File

@ -20,8 +20,11 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -87,7 +90,9 @@ public interface Translog extends IndexShardComponent {
/**
* Adds a create operation to the transaction log.
*/
void add(Operation operation) throws TranslogException;
Location add(Operation operation) throws TranslogException;
byte[] read(Location location);
/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
@ -120,6 +125,18 @@ public interface Translog extends IndexShardComponent {
*/
void close(boolean delete);
static class Location {
public final long translogId;
public final long translogLocation;
public final int size;
public Location(long translogId, long translogLocation, int size) {
this.translogId = translogId;
this.translogLocation = translogLocation;
this.size = size;
}
}
/**
* A snapshot of the transaction log, allows to iterate over all the transaction log operations.
*/
@ -200,6 +217,8 @@ public interface Translog extends IndexShardComponent {
Type opType();
long estimateSize();
BytesHolder readSource(BytesStreamInput in) throws IOException;
}
static class Create implements Operation {
@ -258,6 +277,16 @@ public interface Translog extends IndexShardComponent {
return this.version;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
int length = in.readVInt();
int offset = in.position();
return new BytesHolder(in.underlyingBuffer(), offset, length);
}
@Override public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
@ -357,6 +386,16 @@ public interface Translog extends IndexShardComponent {
return this.version;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
int length = in.readVInt();
int offset = in.position();
return new BytesHolder(in.underlyingBuffer(), offset, length);
}
@Override public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
@ -432,6 +471,10 @@ public interface Translog extends IndexShardComponent {
return this.version;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete operation");
}
@Override public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
uid = new Term(in.readUTF(), in.readUTF());
@ -486,6 +529,10 @@ public interface Translog extends IndexShardComponent {
return this.types;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete_by_query operation");
}
@Override public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
source = new byte[in.readVInt()];

View File

@ -19,6 +19,8 @@
package org.elasticsearch.index.translog;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -52,6 +54,30 @@ public class TranslogStreams {
return operation;
}
public static BytesHolder readSource(byte[] data) throws IOException {
BytesStreamInput in = new BytesStreamInput(data);
in.readInt(); // the size header
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Translog.Operation operation;
switch (type) {
case CREATE:
operation = new Translog.Create();
break;
case DELETE:
operation = new Translog.Delete();
break;
case DELETE_BY_QUERY:
operation = new Translog.DeleteByQuery();
break;
case SAVE:
operation = new Translog.Index();
break;
default:
throw new IOException("No type for [" + type + "]");
}
return operation.readSource(in);
}
public static void writeTranslogOperation(StreamOutput out, Translog.Operation op) throws IOException {
out.writeByte(op.opType().id());
op.writeTo(out);

View File

@ -139,7 +139,26 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
this.trans = null;
}
@Override public void add(Operation operation) throws TranslogException {
public byte[] read(Location location) {
FsTranslogFile trans = this.trans;
if (trans != null && trans.id() == location.translogId) {
try {
return trans.read(location);
} catch (Exception e) {
// ignore
}
}
if (current.id() == location.translogId) {
try {
return current.read(location);
} catch (IOException e) {
// ignore
}
}
return null;
}
@Override public Location add(Operation operation) throws TranslogException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
BytesStreamOutput out = cachedEntry.cachedBytes();
@ -151,18 +170,19 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
out.seek(0);
out.writeInt(size - 4);
current.add(out.unsafeByteArray(), 0, size);
Location location = current.add(out.unsafeByteArray(), 0, size);
if (syncOnEachOperation) {
current.sync();
}
FsTranslogFile trans = this.trans;
if (trans != null) {
try {
trans.add(out.unsafeByteArray(), 0, size);
location = trans.add(out.unsafeByteArray(), 0, size);
} catch (ClosedChannelException e) {
// ignore
}
}
return location;
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import java.io.IOException;
@ -59,11 +60,18 @@ public class FsTranslogFile {
return lastWrittenPosition.get();
}
public void add(byte[] data, int from, int size) throws IOException {
public Translog.Location add(byte[] data, int from, int size) throws IOException {
long position = lastPosition.getAndAdd(size);
raf.channel().write(ByteBuffer.wrap(data, from, size), position);
lastWrittenPosition.getAndAdd(size);
operationCounter.incrementAndGet();
return new Translog.Location(id, position, size);
}
public byte[] read(Translog.Location location) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(location.size);
raf.channel().read(buffer, location.translogLocation);
return buffer.array();
}
public void close(boolean delete) {

View File

@ -26,7 +26,12 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import java.io.IOException;
import java.util.regex.Pattern;
@ -60,6 +65,7 @@ public class RestGetAction extends BaseRestHandler {
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing"));
getRequest.preference(request.param("preference"));
getRequest.realtime(request.paramAsBoolean("realtime", null));
String sField = request.param("fields");
if (sField != null) {

View File

@ -67,8 +67,12 @@ public class RestXContentBuilder {
}
public static void restDocumentSource(byte[] source, XContentBuilder builder, ToXContent.Params params) throws IOException {
if (LZF.isCompressed(source)) {
BytesStreamInput siBytes = new BytesStreamInput(source);
}
public static void restDocumentSource(byte[] source, int offset, int length, XContentBuilder builder, ToXContent.Params params) throws IOException {
if (LZF.isCompressed(source, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(source, offset, length);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
@ -85,7 +89,7 @@ public class RestXContentBuilder {
}
}
} else {
XContentType contentType = XContentFactory.xContentType(source);
XContentType contentType = XContentFactory.xContentType(source, offset, length);
if (contentType == builder.contentType()) {
builder.rawField("_source", source);
} else {

View File

@ -26,6 +26,8 @@ import org.elasticsearch.search.lookup.FieldsLookup;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceLookup;
import java.util.Map;
/**
* A base class for any script type that is used during the search process (custom score, facets, and so on).
*
@ -87,6 +89,10 @@ public abstract class AbstractSearchScript extends AbstractExecutableScript impl
lookup.setNextDocId(doc);
}
@Override public void setNextSource(Map<String, Object> source) {
lookup.source().setNextSource(source);
}
@Override public void setNextScore(float score) {
this.score = score;
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.script;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import java.util.Map;
/**
* A search script.
*/
@ -33,6 +35,8 @@ public interface SearchScript extends ExecutableScript {
void setNextDocId(int doc);
void setNextSource(Map<String, Object> source);
void setNextScore(float score);
float runAsFloat();

View File

@ -165,6 +165,10 @@ public class MvelScriptEngineService extends AbstractComponent implements Script
resolver.createVariable(name, value);
}
@Override public void setNextSource(Map<String, Object> source) {
lookup.source().setNextSource(source);
}
@Override public Object run() {
return script.getValue(null, resolver);
}

View File

@ -65,17 +65,7 @@ public class SourceLookup implements Map {
Document doc = reader.document(docId, SourceFieldSelector.INSTANCE);
Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME);
byte[] source = sourceField.getBinaryValue();
if (LZF.isCompressed(source)) {
BytesStreamInput siBytes = new BytesStreamInput(source);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
parser = XContentFactory.xContent(contentType).createParser(siLzf);
this.source = parser.map();
} else {
parser = XContentFactory.xContent(source).createParser(source);
this.source = parser.map();
}
this.source = sourceAsMap(source, 0, source.length);
} catch (Exception e) {
throw new ElasticSearchParseException("failed to parse / load source", e);
} finally {
@ -86,6 +76,29 @@ public class SourceLookup implements Map {
return this.source;
}
public static Map<String, Object> sourceAsMap(byte[] bytes, int offset, int length) {
XContentParser parser = null;
try {
if (LZF.isCompressed(bytes, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(bytes, offset, length);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
parser = XContentFactory.xContent(contentType).createParser(siLzf);
return parser.map();
} else {
parser = XContentFactory.xContent(bytes, offset, length).createParser(bytes, offset, length);
return parser.map();
}
} catch (Exception e) {
throw new ElasticSearchParseException("Failed to parse source to map", e);
} finally {
if (parser != null) {
parser.close();
}
}
}
public void setNextReader(IndexReader reader) {
if (this.reader == reader) { // if we are called with the same reader, don't invalidate source
return;
@ -103,6 +116,10 @@ public class SourceLookup implements Map {
this.source = null;
}
public void setNextSource(Map<String, Object> source) {
this.source = source;
}
private final static Pattern dotPattern = Pattern.compile("\\.");
/**

View File

@ -19,9 +19,11 @@
package org.elasticsearch.index.engine;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
@ -29,6 +31,7 @@ import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
@ -136,7 +139,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
@ -145,6 +148,16 @@ public abstract class AbstractSimpleEngineTests {
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.release();
// but, we can still get it (in realtime)
Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), equalTo(new BytesHolder(B_1)));
assertThat(getResult.docIdAndVersion(), nullValue());
// but, not there non realtime
getResult = engine.get(new Engine.Get(false, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
// refresh and it should be there
engine.refresh(new Engine.Refresh(true));
@ -154,8 +167,13 @@ public abstract class AbstractSimpleEngineTests {
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.release();
// also in non realtime
getResult = engine.get(new Engine.Get(false, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
// now do an update
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.index(new Engine.Index(null, newUid("1"), doc));
// its not updated yet...
@ -165,6 +183,12 @@ public abstract class AbstractSimpleEngineTests {
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
searchResult.release();
// but, we can still get it (in realtime)
getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), equalTo(new BytesHolder(B_2)));
assertThat(getResult.docIdAndVersion(), nullValue());
// refresh and it should be updated
engine.refresh(new Engine.Refresh(true));
@ -184,6 +208,10 @@ public abstract class AbstractSimpleEngineTests {
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
searchResult.release();
// but, get should not see it (in realtime)
getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
// refresh and it should be deleted
engine.refresh(new Engine.Refresh(true));
@ -194,7 +222,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// add it back
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
@ -217,6 +245,13 @@ public abstract class AbstractSimpleEngineTests {
// now flush
engine.flush(new Engine.Flush());
// and, verify get (in real time)
getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), nullValue());
assertThat(getResult.docIdAndVersion(), notNullValue());
// make sure we can still work with the engine
// now do an update
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.document;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.client.Requests.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
public class GetActionTests extends AbstractNodesTests {
protected Client client;
@BeforeClass public void startNodes() {
startNode("node1");
startNode("node2");
client = client("node1");
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
@Test public void simpleGetTests() {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// fine
}
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1)).execute().actionGet();
ClusterHealthResponse clusterHealth = client.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
GetResponse response = client.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(response.exists(), equalTo(false));
logger.info("--> index doc 1");
client.prepareIndex("test", "type1", "1").setSource("field1", "value1", "field2", "value2").execute().actionGet();
logger.info("--> realtime get 1");
response = client.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(response.exists(), equalTo(true));
assertThat(response.sourceAsMap().get("field1").toString(), equalTo("value1"));
assertThat(response.sourceAsMap().get("field2").toString(), equalTo("value2"));
logger.info("--> non realtime get 1");
response = client.prepareGet("test", "type1", "1").setRealtime(false).execute().actionGet();
assertThat(response.exists(), equalTo(false));
logger.info("--> realtime fetch of field (requires fetching parsing source)");
response = client.prepareGet("test", "type1", "1").setFields("field1").execute().actionGet();
assertThat(response.exists(), equalTo(true));
assertThat(response.source(), nullValue());
assertThat(response.field("field1").values().get(0).toString(), equalTo("value1"));
assertThat(response.field("field2"), nullValue());
logger.info("--> flush the index, so we load it from it");
client.admin().indices().prepareFlush().execute().actionGet();
logger.info("--> realtime get 1 (loaded from index)");
response = client.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(response.exists(), equalTo(true));
assertThat(response.sourceAsMap().get("field1").toString(), equalTo("value1"));
assertThat(response.sourceAsMap().get("field2").toString(), equalTo("value2"));
logger.info("--> non realtime get 1 (loaded from index)");
response = client.prepareGet("test", "type1", "1").setRealtime(false).execute().actionGet();
assertThat(response.exists(), equalTo(true));
assertThat(response.sourceAsMap().get("field1").toString(), equalTo("value1"));
assertThat(response.sourceAsMap().get("field2").toString(), equalTo("value2"));
logger.info("--> realtime fetch of field (loaded from index)");
response = client.prepareGet("test", "type1", "1").setFields("field1").execute().actionGet();
assertThat(response.exists(), equalTo(true));
assertThat(response.source(), nullValue());
assertThat(response.field("field1").values().get(0).toString(), equalTo("value1"));
assertThat(response.field("field2"), nullValue());
}
}

View File

@ -171,6 +171,10 @@ public class GroovyScriptEngineService extends AbstractComponent implements Scri
script.getBinding().getVariables().put(name, value);
}
@Override public void setNextSource(Map<String, Object> source) {
lookup.source().setNextSource(source);
}
@Override public Object run() {
return script.run();
}

View File

@ -32,7 +32,11 @@ import org.elasticsearch.script.javascript.support.NativeList;
import org.elasticsearch.script.javascript.support.NativeMap;
import org.elasticsearch.script.javascript.support.ScriptValueConverter;
import org.elasticsearch.search.lookup.SearchLookup;
import org.mozilla.javascript.*;
import org.mozilla.javascript.Context;
import org.mozilla.javascript.Script;
import org.mozilla.javascript.Scriptable;
import org.mozilla.javascript.ScriptableObject;
import org.mozilla.javascript.WrapFactory;
import java.util.List;
import java.util.Map;
@ -222,6 +226,10 @@ public class JavaScriptScriptEngineService extends AbstractComponent implements
ScriptableObject.putProperty(scope, name, value);
}
@Override public void setNextSource(Map<String, Object> source) {
lookup.source().setNextSource(source);
}
@Override public Object run() {
Context ctx = Context.enter();
try {

View File

@ -155,6 +155,10 @@ public class PythonScriptEngineService extends AbstractComponent implements Scri
lookup.setNextDocId(doc);
}
@Override public void setNextSource(Map<String, Object> source) {
lookup.source().setNextSource(source);
}
@Override public void setNextScore(float score) {
pyVars.__setitem__("_score", Py.java2py(score));
}