Support fetching _routing, _parent, _timestamp using realtime get when stored, closes #1289.

This commit is contained in:
Shay Banon 2011-08-30 22:35:16 +03:00
parent 6560a9ec7b
commit b3ef0a3d7f
8 changed files with 140 additions and 55 deletions

View File

@ -44,6 +44,17 @@ public class BytesStreamInput extends StreamInput {
this.count = Math.min(offset + length, buf.length);
}
@Override public long skip(long n) throws IOException {
if (pos + n > count) {
n = count - pos;
}
if (n < 0) {
return 0;
}
pos += n;
return n;
}
public int position() {
return this.pos;
}

View File

@ -27,7 +27,6 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.lease.Releasable;
@ -743,13 +742,13 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class GetResult {
private final boolean exists;
private final long version;
private final BytesHolder source;
private final Translog.Source source;
private final UidField.DocIdAndVersion docIdAndVersion;
private final Searcher searcher;
public static final GetResult NOT_EXISTS = new GetResult(false, -1, null);
public GetResult(boolean exists, long version, @Nullable BytesHolder source) {
public GetResult(boolean exists, long version, @Nullable Translog.Source source) {
this.source = source;
this.exists = exists;
this.version = version;
@ -773,7 +772,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.version;
}
@Nullable public BytesHolder source() {
@Nullable public Translog.Source source() {
return source;
}

View File

@ -33,7 +33,6 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.bloom.BloomFilter;
@ -304,7 +303,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
byte[] data = translog.read(versionValue.translogLocation());
if (data != null) {
try {
BytesHolder source = TranslogStreams.readSource(data);
Translog.Source source = TranslogStreams.readSource(data);
return new GetResult(true, versionValue.version(), source);
} catch (IOException e) {
// switched on us, read it from the reader

View File

@ -35,12 +35,16 @@ import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMappers;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.lookup.SearchLookup;
@ -224,7 +228,7 @@ public class ShardGetService extends AbstractIndexShardComponent {
return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields);
} else {
BytesHolder source = get.source();
Translog.Source source = get.source();
Map<String, GetField> fields = null;
boolean sourceRequested = false;
@ -236,56 +240,70 @@ public class ShardGetService extends AbstractIndexShardComponent {
// no fields, and no source
sourceRequested = false;
} else {
Map<String, Object> sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
Map<String, Object> sourceAsMap = null;
SearchLookup searchLookup = null;
for (String field : gFields) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
String script = null;
if (field.contains("_source.")) {
script = field;
Object value = null;
if (field.equals(RoutingFieldMapper.NAME) && docMapper.routingFieldMapper().stored()) {
value = source.routing;
} else if (field.equals(ParentFieldMapper.NAME) && docMapper.parentFieldMapper().stored()) {
value = source.parent;
} else if (field.equals(TimestampFieldMapper.NAME) && docMapper.timestampFieldMapper().stored()) {
value = source.timestamp;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null) {
script = "_source." + x.mapper().names().fullName();
String script = null;
if (field.contains("_source.")) {
script = field;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null) {
script = "_source." + x.mapper().names().fullName();
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(mapperService, indexCache.fieldData());
}
if (sourceAsMap == null) {
sourceAsMap = SourceLookup.sourceAsMap(source.source.bytes(), source.source.offset(), source.source.length());
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
// we can't do this, only allow to run scripts against the source
//searchScript.setNextReader(docIdAndVersion.reader);
//searchScript.setNextDocId(docIdAndVersion.docId);
// but, we need to inject the parsed source into the script, so it will be used...
searchScript.setNextSource(sourceAsMap);
try {
value = searchScript.run();
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
}
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(mapperService, indexCache.fieldData());
if (value != null) {
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
// we can't do this, only allow to run scripts against the source
//searchScript.setNextReader(docIdAndVersion.reader);
//searchScript.setNextDocId(docIdAndVersion.docId);
// but, we need to inject the parsed source into the script, so it will be used...
searchScript.setNextSource(sourceAsMap);
try {
Object value = searchScript.run();
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
}
}
}
return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), sourceRequested ? source : null, fields);
return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), sourceRequested ? source.source : null, fields);
}
} finally {
get.release();

View File

@ -223,7 +223,21 @@ public interface Translog extends IndexShardComponent {
long estimateSize();
BytesHolder readSource(BytesStreamInput in) throws IOException;
Source readSource(BytesStreamInput in) throws IOException;
}
static class Source {
public final BytesHolder source;
public final String routing;
public final String parent;
public final long timestamp;
public Source(BytesHolder source, String routing, String parent, long timestamp) {
this.source = source;
this.routing = routing;
this.parent = parent;
this.timestamp = timestamp;
}
}
static class Create implements Operation {
@ -288,14 +302,32 @@ public interface Translog extends IndexShardComponent {
return this.version;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
int length = in.readVInt();
int offset = in.position();
return new BytesHolder(in.underlyingBuffer(), offset, length);
BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length);
in.skip(length);
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
return new Source(source, routing, parent, timestamp);
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -407,14 +439,32 @@ public interface Translog extends IndexShardComponent {
return this.version;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
int length = in.readVInt();
int offset = in.position();
return new BytesHolder(in.underlyingBuffer(), offset, length);
BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length);
in.skip(length);
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
return new Source(source, routing, parent, timestamp);
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -496,7 +546,7 @@ public interface Translog extends IndexShardComponent {
return this.version;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete operation");
}
@ -554,7 +604,7 @@ public interface Translog extends IndexShardComponent {
return this.types;
}
@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete_by_query operation");
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.translog;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -54,7 +53,7 @@ public class TranslogStreams {
return operation;
}
public static BytesHolder readSource(byte[] data) throws IOException {
public static Translog.Source readSource(byte[] data) throws IOException {
BytesStreamInput in = new BytesStreamInput(data);
in.readInt(); // the size header
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());

View File

@ -214,7 +214,7 @@ public abstract class AbstractSimpleEngineTests {
// but, we can still get it (in realtime)
Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), equalTo(new BytesHolder(B_1)));
assertThat(getResult.source().source, equalTo(new BytesHolder(B_1)));
assertThat(getResult.docIdAndVersion(), nullValue());
// but, not there non realtime
@ -249,7 +249,7 @@ public abstract class AbstractSimpleEngineTests {
// but, we can still get it (in realtime)
getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), equalTo(new BytesHolder(B_2)));
assertThat(getResult.source().source, equalTo(new BytesHolder(B_2)));
assertThat(getResult.docIdAndVersion(), nullValue());
// refresh and it should be updated

View File

@ -64,12 +64,21 @@ public class SimpleTimestampTests extends AbstractNodesTests {
client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
long now2 = System.currentTimeMillis();
// we need to add support for fetching _timestamp from the translog in realtime case...
GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
// we check both realtime get and non realtime get
GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(true).execute().actionGet();
long timestamp = ((Number) getResponse.field("_timestamp").value()).longValue();
assertThat(timestamp, greaterThanOrEqualTo(now1));
assertThat(timestamp, lessThanOrEqualTo(now2));
// verify its the same timestamp when going the replica
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(true).execute().actionGet();
assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp));
// non realtime get (stored)
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
timestamp = ((Number) getResponse.field("_timestamp").value()).longValue();
assertThat(timestamp, greaterThanOrEqualTo(now1));
assertThat(timestamp, lessThanOrEqualTo(now2));
// verify its the same timestamp when going the replica
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp));