Versioning, closes #594.

This commit is contained in:
kimchy 2011-01-04 04:04:30 +02:00
parent 9335b3a9e6
commit 45c1ab06b3
47 changed files with 1528 additions and 202 deletions

View File

@ -159,6 +159,7 @@
<w>userid</w>
<w>uuid</w>
<w>versioned</w>
<w>versioning</w>
<w>warmup</w>
<w>wikipedia</w>
<w>wildcards</w>

View File

@ -19,24 +19,20 @@
package org.elasticsearch.benchmark.common.lucene.uidscan;
import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.document.AbstractField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermPositions;
import org.apache.lucene.store.FSDirectory;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.CountDownLatch;
/**
@ -59,7 +55,7 @@ public class LuceneUidScanBenchmark {
System.out.println("Indexing " + INDEX_COUNT + " docs...");
for (long i = startUid; i < LIMIT; i++) {
Document doc = new Document();
doc.add(new UidField(Long.toString(i), i));
doc.add(new UidField("_uid", Long.toString(i), i));
writer.addDocument(doc);
}
System.out.println("Done indexing, took " + watch.stop().lastTaskTime());
@ -104,55 +100,4 @@ public class LuceneUidScanBenchmark {
watch.stop();
System.out.println("Scanned in " + watch.totalTime() + " TP Seconds " + ((SCAN_COUNT * NUMBER_OF_THREADS) / watch.totalTime().secondsFrac()));
}
public static class UidField extends AbstractField {
private final String uid;
private final long version;
public UidField(String uid, long version) {
super("_uid", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.NO);
this.uid = uid;
this.version = version;
}
@Override public String stringValue() {
return uid;
}
@Override public Reader readerValue() {
return null;
}
@Override public TokenStream tokenStreamValue() {
try {
return new UidPayloadTokenStream(Lucene.KEYWORD_ANALYZER.reusableTokenStream("_uid", new FastStringReader(uid)), version);
} catch (IOException e) {
throw new RuntimeException("failed to create token stream", e);
}
}
}
public static class UidPayloadTokenStream extends TokenFilter {
private final PayloadAttribute payloadAttribute;
private final long version;
public UidPayloadTokenStream(TokenStream input, long version) {
super(input);
this.version = version;
payloadAttribute = addAttribute(PayloadAttribute.class);
}
@Override public boolean incrementToken() throws IOException {
if (!input.incrementToken()) {
return false;
}
payloadAttribute.setPayload(new Payload(Numbers.longToBytes(version)));
return true;
}
}
}

View File

@ -166,7 +166,7 @@ public class SimpleEngineBenchmark {
.add(field("content", contentItem)).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) {
engine.create(new Engine.Create(pDoc));
engine.create(new Engine.Create(new Term("_id", sId), pDoc));
} else {
engine.index(new Engine.Index(new Term("_id", sId), pDoc));
}
@ -280,7 +280,7 @@ public class SimpleEngineBenchmark {
.add(field("content", content(id))).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) {
engine.create(new Engine.Create(pDoc));
engine.create(new Engine.Create(new Term("_id", sId), pDoc));
} else {
engine.index(new Engine.Index(new Term("_id", sId), pDoc));
}

View File

@ -213,6 +213,21 @@ public class BulkItemResponse implements Streamable {
return id();
}
/**
* The version of the action.
*/
public long version() {
if (failure != null) {
return -1;
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).version();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).version();
}
return -1;
}
/**
* The actual response ({@link IndexResponse} or {@link DeleteResponse}). <tt>null</tt> in
* case of failure.

View File

@ -112,6 +112,7 @@ public class BulkRequest implements ActionRequest {
String routing = null;
String parent = null;
String opType = null;
long version = 0;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -130,6 +131,8 @@ public class BulkRequest implements ActionRequest {
parent = parser.text();
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
} else if ("_version".equals(currentFieldName)) {
version = parser.longValue();
}
}
}
@ -144,15 +147,15 @@ public class BulkRequest implements ActionRequest {
// order is important, we set parent after routing, so routing will be set to parent if not set explicitly
if ("index".equals(action)) {
if (opType == null) {
add(new IndexRequest(index, type, id).routing(routing).parent(parent)
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.source(data, from, nextMarker - from, contentUnsafe));
} else {
add(new IndexRequest(index, type, id).routing(routing).parent(parent)
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.create("create".equals(opType))
.source(data, from, nextMarker - from, contentUnsafe));
}
} else if ("create".equals(action)) {
add(new IndexRequest(index, type, id).routing(routing).parent(parent)
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.create(true)
.source(data, from, nextMarker - from, contentUnsafe));
}

View File

@ -120,9 +120,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
ops[i] = indexShard.prepareIndex(sourceToParse);
ops[i] = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
} else {
ops[i] = indexShard.prepareCreate(sourceToParse);
ops[i] = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
@ -134,7 +134,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id());
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("[" + shardRequest.request.index() + "][" + shardRequest.shardId + "]" + ": Failed to execute bulk item (delete) [" + deleteRequest + "]", e);
@ -157,8 +157,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
BulkItemRequest item = request.items()[i];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
long version;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index engineIndex = (Engine.Index) ops[i];
version = engineIndex.version();
if (!processedTypes.contains(engineIndex.type())) {
processedTypes.add(engineIndex.type());
ParsedDocument doc = engineIndex.parsedDoc();
@ -168,6 +170,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
} else {
Engine.Create engineCreate = (Engine.Create) ops[i];
version = engineCreate.version();
if (!processedTypes.contains(engineCreate.type())) {
processedTypes.add(engineCreate.type());
ParsedDocument doc = engineCreate.parsedDoc();
@ -176,21 +179,26 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
}
// update the version on request so it will happen on the replicas
indexRequest.version(version);
if (failures != null && failures[i] != null) {
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
} else {
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id()));
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version));
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
Engine.Delete engineDelete = (Engine.Delete) ops[i];
// update the version on request so it will happen on the replicas
deleteRequest.version(engineDelete.version());
if (failures != null && failures[i] != null) {
responses[i] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
} else {
responses[i] = new BulkItemResponse(item.id(), "delete",
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()));
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), engineDelete.version(), engineDelete.notFound()));
}
}
}
@ -209,9 +217,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
ops[i] = indexShard.prepareIndex(sourceToParse);
ops[i] = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
} else {
ops[i] = indexShard.prepareCreate(sourceToParse);
ops[i] = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
}
} catch (Exception e) {
// ignore, we are on backup
@ -219,7 +227,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id());
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.REPLICA);
} catch (Exception e) {
// ignore, we are on backup
}

View File

@ -51,6 +51,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
private String id;
@Nullable private String routing;
private boolean refresh;
private long version;
/**
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
@ -197,6 +198,19 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
return this.refresh;
}
/**
* Sets the version, which will cause the delete operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public DeleteRequest version(long version) {
this.version = version;
return this;
}
public long version() {
return this.version;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
@ -205,6 +219,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
routing = in.readUTF();
}
refresh = in.readBoolean();
version = in.readLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -218,6 +233,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
out.writeUTF(routing);
}
out.writeBoolean(refresh);
out.writeLong(version);
}
@Override public String toString() {

View File

@ -41,14 +41,20 @@ public class DeleteResponse implements ActionResponse, Streamable {
private String type;
private long version;
private boolean notFound;
public DeleteResponse() {
}
public DeleteResponse(String index, String type, String id) {
public DeleteResponse(String index, String type, String id, long version, boolean notFound) {
this.index = index;
this.id = id;
this.type = type;
this.version = version;
this.notFound = notFound;
}
/**
@ -93,15 +99,47 @@ public class DeleteResponse implements ActionResponse, Streamable {
return id;
}
/**
* The version of the delete operation.
*/
public long version() {
return this.version;
}
/**
* The version of the delete operation.
*/
public long getVersion() {
return this.version;
}
/**
* Returns <tt>true</tt> if there was no doc found to delete.
*/
public boolean notFound() {
return notFound;
}
/**
* Returns <tt>true</tt> if there was no doc found to delete.
*/
public boolean isNotFound() {
return notFound;
}
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
id = in.readUTF();
type = in.readUTF();
version = in.readLong();
notFound = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(id);
out.writeUTF(type);
out.writeLong(version);
out.writeBoolean(notFound);
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.index.IndexDeleteRequest;
import org.elasticsearch.action.delete.index.IndexDeleteResponse;
import org.elasticsearch.action.delete.index.ShardDeleteResponse;
import org.elasticsearch.action.delete.index.TransportIndexDeleteAction;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
@ -97,8 +98,17 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.routing() == null) {
indexDeleteAction.execute(new IndexDeleteRequest(request), new ActionListener<IndexDeleteResponse>() {
@Override public void onResponse(IndexDeleteResponse indexDeleteResponse) {
// TODO what do we do with specific failed shards?
listener.onResponse(new DeleteResponse(request.index(), request.type(), request.id()));
// go over the response, see if we have found one, and the version if found
long version = 0;
boolean found = false;
for (ShardDeleteResponse deleteResponse : indexDeleteResponse.responses()) {
if (!deleteResponse.notFound()) {
found = true;
version = deleteResponse.version();
break;
}
}
listener.onResponse(new DeleteResponse(request.index(), request.type(), request.id(), version, !found));
}
@Override public void onFailure(Throwable e) {
@ -135,16 +145,20 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Override protected DeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.PRIMARY);
delete.refresh(request.refresh());
indexShard.delete(delete);
return new DeleteResponse(request.index(), request.type(), request.id());
// update the request with teh version so it will go to the replicas
request.version(delete.version());
return new DeleteResponse(request.index(), request.type(), request.id(), delete.version(), delete.notFound());
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
delete.refresh(request.refresh());
indexShard.delete(delete);
}

View File

@ -37,6 +37,8 @@ public class IndexDeleteRequest extends IndexReplicationOperationRequest {
private boolean refresh = false;
private long version;
IndexDeleteRequest() {
}
@ -48,6 +50,7 @@ public class IndexDeleteRequest extends IndexReplicationOperationRequest {
this.type = request.type();
this.id = request.id();
this.refresh = request.refresh();
this.version = request.version();
}
public String type() {
@ -62,11 +65,16 @@ public class IndexDeleteRequest extends IndexReplicationOperationRequest {
return this.refresh;
}
public long version() {
return this.version;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
id = in.readUTF();
refresh = in.readBoolean();
version = in.readLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -74,5 +82,6 @@ public class IndexDeleteRequest extends IndexReplicationOperationRequest {
out.writeUTF(type);
out.writeUTF(id);
out.writeBoolean(refresh);
out.writeLong(version);
}
}

View File

@ -39,10 +39,13 @@ public class IndexDeleteResponse implements ActionResponse, Streamable {
private int failedShards;
IndexDeleteResponse(String index, int successfulShards, int failedShards) {
private ShardDeleteResponse[] deleteResponses;
IndexDeleteResponse(String index, int successfulShards, int failedShards, ShardDeleteResponse[] deleteResponses) {
this.index = index;
this.successfulShards = successfulShards;
this.failedShards = failedShards;
this.deleteResponses = deleteResponses;
}
IndexDeleteResponse() {
@ -105,15 +108,28 @@ public class IndexDeleteResponse implements ActionResponse, Streamable {
return failedShards;
}
public ShardDeleteResponse[] responses() {
return this.deleteResponses;
}
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
successfulShards = in.readVInt();
failedShards = in.readVInt();
deleteResponses = new ShardDeleteResponse[in.readVInt()];
for (int i = 0; i < deleteResponses.length; i++) {
deleteResponses[i] = new ShardDeleteResponse();
deleteResponses[i].readFrom(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
out.writeVInt(deleteResponses.length);
for (ShardDeleteResponse deleteResponse : deleteResponses) {
deleteResponse.writeTo(out);
}
}
}

View File

@ -39,6 +39,7 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest {
private String type;
private String id;
private boolean refresh = false;
private long version;
ShardDeleteRequest(IndexDeleteRequest request, int shardId) {
this.index = request.index();
@ -49,6 +50,7 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest {
consistencyLevel(request.consistencyLevel());
timeout = request.timeout();
this.refresh = request.refresh();
this.version = request.version();
}
ShardDeleteRequest() {
@ -81,12 +83,21 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest {
return this.refresh;
}
public void version(long version) {
this.version = version;
}
public long version() {
return this.version;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = in.readVInt();
type = in.readUTF();
id = in.readUTF();
refresh = in.readBoolean();
version = in.readLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -95,5 +106,6 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest {
out.writeUTF(type);
out.writeUTF(id);
out.writeBoolean(refresh);
out.writeLong(version);
}
}

View File

@ -33,9 +33,33 @@ import java.io.IOException;
*/
public class ShardDeleteResponse implements ActionResponse, Streamable {
private long version;
private boolean notFound;
public ShardDeleteResponse() {
}
public ShardDeleteResponse(long version, boolean notFound) {
this.version = version;
this.notFound = notFound;
}
public long version() {
return version;
}
public boolean notFound() {
return notFound;
}
@Override public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
notFound = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeBoolean(notFound);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -48,14 +49,16 @@ public class TransportIndexDeleteAction extends TransportIndexReplicationOperati
@Override protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, AtomicReferenceArray shardsResponses) {
int successfulShards = 0;
int failedShards = 0;
ArrayList<ShardDeleteResponse> responses = new ArrayList<ShardDeleteResponse>();
for (int i = 0; i < shardsResponses.length(); i++) {
if (shardsResponses.get(i) == null) {
failedShards++;
} else {
responses.add((ShardDeleteResponse) shardsResponses.get(i));
successfulShards++;
}
}
return new IndexDeleteResponse(request.index(), successfulShards, failedShards);
return new IndexDeleteResponse(request.index(), successfulShards, failedShards, responses.toArray(new ShardDeleteResponse[responses.size()]));
}
@Override protected boolean accumulateExceptions() {

View File

@ -69,16 +69,20 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
@Override protected ShardDeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.PRIMARY);
delete.refresh(request.refresh());
indexShard.delete(delete);
return new ShardDeleteResponse();
// update the version to happen on the replicas
request.version(delete.version());
return new ShardDeleteResponse(delete.version(), delete.notFound());
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
delete.refresh(request.refresh());
indexShard.delete(delete);
}

View File

@ -53,6 +53,8 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
private String id;
private long version;
private boolean exists;
private Map<String, GetField> fields;
@ -64,10 +66,11 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
GetResponse() {
}
GetResponse(String index, String type, String id, boolean exists, byte[] source, Map<String, GetField> fields) {
GetResponse(String index, String type, String id, long version, boolean exists, byte[] source, Map<String, GetField> fields) {
this.index = index;
this.type = type;
this.id = id;
this.version = version;
this.exists = exists;
this.source = source;
this.fields = fields;
@ -132,6 +135,20 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
return id;
}
/**
* The version of the doc.
*/
public long version() {
return this.version;
}
/**
* The version of the doc.
*/
public long getVersion() {
return this.version;
}
/**
* The source of the document if exists.
*/
@ -220,6 +237,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString FIELDS = new XContentBuilderString("fields");
}
@ -235,6 +253,9 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
builder.field(Fields._INDEX, index);
builder.field(Fields._TYPE, type);
builder.field(Fields._ID, id);
if (version != -1) {
builder.field(Fields._VERSION, version);
}
if (source != null) {
RestXContentBuilder.restDocumentSource(source, builder, params);
}
@ -268,6 +289,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
index = in.readUTF();
type = in.readUTF();
id = in.readUTF();
version = in.readLong();
exists = in.readBoolean();
if (exists) {
int size = in.readVInt();
@ -292,6 +314,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
out.writeUTF(index);
out.writeUTF(type);
out.writeUTF(id);
out.writeLong(version);
out.writeBoolean(exists);
if (exists) {
if (source == null) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.*;
@ -89,13 +90,17 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
boolean exists = false;
byte[] source = null;
Map<String, GetField> fields = null;
long version = -1;
try {
int docId = Lucene.docId(searcher.reader(), docMapper.uidMapper().term(request.type(), request.id()));
if (docId != Lucene.NO_DOC) {
UidField.DocIdAndVersion docIdAndVersion = UidField.loadDocIdAndVersion(searcher.reader(), docMapper.uidMapper().term(request.type(), request.id()));
if (docIdAndVersion.docId != Lucene.NO_DOC) {
if (docIdAndVersion.version > 0) {
version = docIdAndVersion.version;
}
exists = true;
FieldSelector fieldSelector = buildFieldSelectors(docMapper, request.fields());
if (fieldSelector != null) {
Document doc = searcher.reader().document(docId, fieldSelector);
Document doc = searcher.reader().document(docIdAndVersion.docId, fieldSelector);
source = extractSource(doc, docMapper);
for (Object oField : doc.getFields()) {
@ -136,7 +141,7 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
} finally {
searcher.release();
}
return new GetResponse(request.index(), request.type(), request.id(), exists, source, fields);
return new GetResponse(request.index(), request.type(), request.id(), version, exists, source, fields);
}
private FieldSelector buildFieldSelectors(DocumentMapper docMapper, String... fields) {

View File

@ -124,6 +124,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
private OpType opType = OpType.INDEX;
private boolean refresh = false;
private long version = 0;
private XContentType contentType = Requests.INDEX_CONTENT_TYPE;
@ -519,6 +520,19 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return this.refresh;
}
/**
* Sets the version, which will cause the index operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public IndexRequest version(long version) {
this.version = version;
return this;
}
public long version() {
return this.version;
}
public void processRouting(MappingMetaData mappingMd) throws ElasticSearchException {
if (routing == null && mappingMd.routing().hasPath()) {
XContentParser parser = null;
@ -561,6 +575,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
opType = OpType.fromId(in.readByte());
refresh = in.readBoolean();
version = in.readLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -588,6 +603,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeBytes(source, sourceOffset, sourceLength);
out.writeByte(opType.id());
out.writeBoolean(refresh);
out.writeLong(version);
}
@Override public String toString() {

View File

@ -41,14 +41,17 @@ public class IndexResponse implements ActionResponse, Streamable {
private String type;
private long version;
public IndexResponse() {
}
public IndexResponse(String index, String type, String id) {
public IndexResponse(String index, String type, String id, long version) {
this.index = index;
this.id = id;
this.type = type;
this.version = version;
}
/**
@ -93,15 +96,31 @@ public class IndexResponse implements ActionResponse, Streamable {
return id;
}
/**
* Returns the version of the doc indexed.
*/
public long version() {
return this.version;
}
/**
* Returns the version of the doc indexed.
*/
public long getVersion() {
return version();
}
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
id = in.readUTF();
type = in.readUTF();
version = in.readLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeUTF(id);
out.writeUTF(type);
out.writeLong(version);
}
}

View File

@ -163,19 +163,29 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent());
ParsedDocument doc;
long version;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse);
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version())
.origin(Engine.Operation.Origin.PRIMARY);
index.refresh(request.refresh());
doc = indexShard.index(index);
version = index.version();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse);
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version())
.origin(Engine.Operation.Origin.PRIMARY);
create.refresh(request.refresh());
doc = indexShard.create(create);
version = create.version();
}
if (doc.mappersAdded()) {
updateMappingOnMaster(request);
}
return new IndexResponse(request.index(), request.type(), request.id());
// update the version on the request, so it will be used for the replicas
request.version(version);
return new IndexResponse(request.index(), request.type(), request.id(), version);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
@ -184,11 +194,15 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse);
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version())
.origin(Engine.Operation.Origin.REPLICA);
index.refresh(request.refresh());
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse);
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version())
.origin(Engine.Operation.Origin.REPLICA);
create.refresh(request.refresh());
indexShard.create(create);
}

View File

@ -42,6 +42,8 @@ import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.DocumentAlreadyExistsEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
@ -639,6 +641,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (cause instanceof ConnectTransportException) {
return true;
}
// on version conflict or document missing, it means
// that a news change has crept into the replica, and its fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
// same here
if (cause instanceof DocumentAlreadyExistsEngineException) {
return true;
}
return false;
}
}

View File

@ -83,6 +83,15 @@ public class DeleteRequestBuilder extends BaseRequestBuilder<DeleteRequest, Dele
return this;
}
/**
* Sets the version, which will cause the delete operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public DeleteRequestBuilder setVersion(long version) {
request.version(version);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -275,6 +275,15 @@ public class IndexRequestBuilder extends BaseRequestBuilder<IndexRequest, IndexR
return this;
}
/**
* Sets the version, which will cause the index operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public IndexRequestBuilder setVersion(long version) {
request.version(version);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -22,6 +22,7 @@ package org.elasticsearch.common.lucene;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.common.lucene.uid.UidField;
/**
* @author kimchy (Shay Banon)
@ -34,6 +35,14 @@ public class DocumentBuilder {
return new DocumentBuilder();
}
public static Fieldable uidField(String value) {
return uidField(value, 0);
}
public static Fieldable uidField(String value, long version) {
return new UidField("_uid", value, version);
}
public static FieldBuilder field(String name, String value) {
return field(name, value, Field.Store.YES, Field.Index.ANALYZED);
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.document.AbstractField;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Payload;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermPositions;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.lucene.Lucene;
import java.io.IOException;
import java.io.Reader;
/**
* @author kimchy (shay.banon)
*/
public class UidField extends AbstractField {
public static class DocIdAndVersion {
public final int docId;
public final long version;
public DocIdAndVersion(int docId, long version) {
this.docId = docId;
this.version = version;
}
}
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) {
int docId = Lucene.NO_DOC;
try {
TermPositions uid = reader.termPositions(term);
if (!uid.next()) {
return new DocIdAndVersion(Lucene.NO_DOC, -1);
}
docId = uid.doc();
uid.nextPosition();
if (!uid.isPayloadAvailable()) {
return new DocIdAndVersion(docId, -2);
}
if (uid.getPayloadLength() < 8) {
return new DocIdAndVersion(docId, -2);
}
byte[] payload = uid.getPayload(new byte[8], 0);
return new DocIdAndVersion(docId, Numbers.bytesToLong(payload));
} catch (Exception e) {
return new DocIdAndVersion(docId, -2);
}
}
/**
* Load the version for the uid from the reader, returning -1 if no doc exists, or -2 if
* no version is available (for backward comp.)
*/
public static long loadVersion(IndexReader reader, Term term) {
try {
TermPositions uid = reader.termPositions(term);
if (!uid.next()) {
return -1;
}
uid.nextPosition();
if (!uid.isPayloadAvailable()) {
return -2;
}
if (uid.getPayloadLength() < 8) {
return -2;
}
byte[] payload = uid.getPayload(new byte[8], 0);
return Numbers.bytesToLong(payload);
} catch (Exception e) {
return -2;
}
}
private final String uid;
private long version;
public UidField(String name, String uid, long version) {
super(name, Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.NO);
this.uid = uid;
this.version = version;
this.omitTermFreqAndPositions = false;
}
@Override public void setOmitTermFreqAndPositions(boolean omitTermFreqAndPositions) {
// never allow to set this, since we want payload!
}
@Override public String stringValue() {
return uid;
}
@Override public Reader readerValue() {
return null;
}
public long version() {
return this.version;
}
public void version(long version) {
this.version = version;
}
@Override public TokenStream tokenStreamValue() {
try {
return new UidPayloadTokenStream(Lucene.KEYWORD_ANALYZER.reusableTokenStream("_uid", new FastStringReader(uid)), this);
} catch (IOException e) {
throw new RuntimeException("failed to create token stream", e);
}
}
public static class UidPayloadTokenStream extends TokenFilter {
private final PayloadAttribute payloadAttribute;
private final UidField field;
public UidPayloadTokenStream(TokenStream input, UidField field) {
super(input);
this.field = field;
payloadAttribute = addAttribute(PayloadAttribute.class);
}
@Override public boolean incrementToken() throws IOException {
if (!input.incrementToken()) {
return false;
}
payloadAttribute.setPayload(new Payload(Numbers.longToBytes(field.version())));
return true;
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.index.shard.ShardId;
/**
* @author kimchy (shay.banon)
*/
public class DocumentAlreadyExistsEngineException extends EngineException {
public DocumentAlreadyExistsEngineException(ShardId shardId, String type, String id) {
super(shardId, "[" + type + "][" + id + "]: document already exists");
}
}

View File

@ -28,10 +28,12 @@ import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.translog.Translog;
@ -247,7 +249,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
DELETE
}
static enum Origin {
PRIMARY,
REPLICA,
RECOVERY
}
Type opType();
Origin origin();
}
static class Bulk {
@ -273,10 +283,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
}
static class Create implements Operation {
private final Term uid;
private final ParsedDocument doc;
private boolean refresh;
private long version;
private Origin origin = Origin.PRIMARY;
public Create(ParsedDocument doc) {
public Create(Term uid, ParsedDocument doc) {
this.uid = uid;
this.doc = doc;
}
@ -284,10 +298,23 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return Type.CREATE;
}
public Create origin(Origin origin) {
this.origin = origin;
return this;
}
@Override public Origin origin() {
return this.origin;
}
public ParsedDocument parsedDoc() {
return this.doc;
}
public Term uid() {
return this.uid;
}
public String type() {
return this.doc.type();
}
@ -300,6 +327,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.routing();
}
public long version() {
return this.version;
}
public Create version(long version) {
this.version = version;
return this;
}
public String parent() {
return this.doc.parent();
}
@ -323,12 +359,18 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public void refresh(boolean refresh) {
this.refresh = refresh;
}
public UidField uidField() {
return (UidField) doc().getFieldable(UidFieldMapper.NAME);
}
}
static class Index implements Operation {
private final Term uid;
private final ParsedDocument doc;
private boolean refresh;
private long version;
private Origin origin = Origin.PRIMARY;
public Index(Term uid, ParsedDocument doc) {
this.uid = uid;
@ -339,6 +381,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return Type.INDEX;
}
public Index origin(Origin origin) {
this.origin = origin;
return this;
}
@Override public Origin origin() {
return this.origin;
}
public Term uid() {
return this.uid;
}
@ -347,6 +398,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc;
}
public Index version(long version) {
this.version = version;
return this;
}
public long version() {
return this.version;
}
public Document doc() {
return this.doc.doc();
}
@ -382,13 +442,24 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public void refresh(boolean refresh) {
this.refresh = refresh;
}
public UidField uidField() {
return (UidField) doc().getFieldable(UidFieldMapper.NAME);
}
}
static class Delete implements Operation {
private final String type;
private final String id;
private final Term uid;
private boolean refresh;
private long version;
private Origin origin = Origin.PRIMARY;
private boolean notFound;
public Delete(Term uid) {
public Delete(String type, String id, Term uid) {
this.type = type;
this.id = id;
this.uid = uid;
}
@ -396,6 +467,23 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return Type.DELETE;
}
public Delete origin(Origin origin) {
this.origin = origin;
return this;
}
@Override public Origin origin() {
return this.origin;
}
public String type() {
return this.type;
}
public String id() {
return this.id;
}
public Term uid() {
return this.uid;
}
@ -407,6 +495,24 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public void refresh(boolean refresh) {
this.refresh = refresh;
}
public Delete version(long version) {
this.version = version;
return this;
}
public long version() {
return this.version;
}
public boolean notFound() {
return this.notFound;
}
public Delete notFound(boolean notFound) {
this.notFound = notFound;
return this;
}
}
static class DeleteByQuery {

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.index.shard.ShardId;
/**
* @author kimchy (shay.banon)
*/
public class VersionConflictEngineException extends EngineException {
public VersionConflictEngineException(ShardId shardId, String type, String id, long current, long required) {
super(shardId, "[" + type + "][" + id + "]: version conflict, current [" + current + "], required [" + required + "]");
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.engine.robin;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticSearchException;
@ -29,6 +30,7 @@ import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.IndexWriters;
import org.elasticsearch.common.lucene.ReaderSearcherHolder;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -48,6 +50,8 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -103,6 +107,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
private volatile int disableFlushCounter = 0;
private final ConcurrentMap<String, VersionValue> versionMap;
private final Object[] dirtyLocks;
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
AnalysisService analysisService, SimilarityService similarityService) throws EngineException {
@ -124,6 +132,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
this.mergeScheduler = mergeScheduler;
this.analysisService = analysisService;
this.similarityService = similarityService;
this.versionMap = new ConcurrentHashMap<String, VersionValue>(1000);
this.dirtyLocks = new Object[componentSettings.getAsInt("concurrency", 10000)];
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
}
}
@Override public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
@ -202,18 +216,15 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
switch (op.opType()) {
case CREATE:
Create create = (Create) op;
writer.addDocument(create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
innerCreate(create, writer);
break;
case INDEX:
Index index = (Index) op;
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
translog.add(new Translog.Index(index));
innerIndex(index, writer);
break;
case DELETE:
Delete delete = (Delete) op;
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
innerDelete(delete, writer);
break;
}
} catch (Exception e) {
@ -222,13 +233,25 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
switch (op.opType()) {
case CREATE:
failures[i] = new CreateFailedEngineException(shardId, (Create) op, e);
if (e instanceof EngineException) {
failures[i] = (EngineException) e;
} else {
failures[i] = new CreateFailedEngineException(shardId, (Create) op, e);
}
break;
case INDEX:
failures[i] = new IndexFailedEngineException(shardId, (Index) op, e);
if (e instanceof EngineException) {
failures[i] = (EngineException) e;
} else {
failures[i] = new IndexFailedEngineException(shardId, (Index) op, e);
}
break;
case DELETE:
failures[i] = new DeleteFailedEngineException(shardId, (Delete) op, e);
if (e instanceof EngineException) {
failures[i] = (EngineException) e;
} else {
failures[i] = new DeleteFailedEngineException(shardId, (Delete) op, e);
}
break;
}
}
@ -254,8 +277,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (writer == null) {
throw new EngineClosedException(shardId);
}
writer.addDocument(create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
innerCreate(create, writer);
dirty = true;
if (create.refresh()) {
refresh(new Refresh(false));
@ -267,6 +289,76 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
}
private void innerCreate(Create create, IndexWriter writer) throws IOException {
synchronized (dirtyLock(create.uid())) {
UidField uidField = create.uidField();
if (create.origin() == Operation.Origin.RECOVERY) {
// on recovery, we get the actual version we want to use
if (create.version() != 0) {
versionMap.put(create.uid().text(), new VersionValue(create.version(), false));
}
uidField.version(create.version());
writer.addDocument(create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
} else {
long expectedVersion = create.version();
long currentVersion;
VersionValue versionValue = versionMap.get(create.uid().text());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(create.uid());
} else {
currentVersion = versionValue.version();
}
// same logic as index
long updatedVersion;
if (create.origin() == Operation.Origin.PRIMARY) {
if (expectedVersion != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore...
// an explicit version is provided, see if there is a conflict
// if the current version is -1, means we did not find anything, and
// a version is provided, so we do expect to find a doc under that version
if (currentVersion == -1) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), -1, expectedVersion);
} else if (expectedVersion != currentVersion) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
}
}
updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1;
} else { // if (index.origin() == Operation.Origin.REPLICA) {
if (currentVersion != -2) { // -2 means we don't have a version, so ignore...
// if it does not exists, and its considered the first index operation (replicas are 1 of)
// then nothing to do
if (!(currentVersion == -1 && create.version() == 1)) {
// with replicas, we only check for previous version, we allow to set a future version
if (expectedVersion <= currentVersion) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
}
}
}
// replicas already hold the "future" version
updatedVersion = create.version();
}
// if the doc does not exists or it exists but not delete
if (versionValue != null) {
if (!versionValue.delete()) {
throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id());
}
} else if (currentVersion != -1) {
// its not deleted, its already there
throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id());
}
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false));
uidField.version(updatedVersion);
create.version(updatedVersion);
writer.addDocument(create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
}
}
}
@Override public void index(Index index) throws EngineException {
rwl.readLock().lock();
try {
@ -274,8 +366,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (writer == null) {
throw new EngineClosedException(shardId);
}
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
translog.add(new Translog.Index(index));
innerIndex(index, writer);
dirty = true;
if (index.refresh()) {
refresh(new Refresh(false));
@ -287,6 +379,71 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
}
private void innerIndex(Index index, IndexWriter writer) throws IOException {
synchronized (dirtyLock(index.uid())) {
UidField uidField = index.uidField();
if (index.origin() == Operation.Origin.RECOVERY) {
// on recovery, we get the actual version we want to use
if (index.version() != 0) {
versionMap.put(index.uid().text(), new VersionValue(index.version(), false));
}
uidField.version(index.version());
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
translog.add(new Translog.Index(index));
} else {
long expectedVersion = index.version();
long currentVersion;
VersionValue versionValue = versionMap.get(index.uid().text());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
currentVersion = versionValue.version();
}
long updatedVersion;
if (index.origin() == Operation.Origin.PRIMARY) {
if (expectedVersion != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore...
// an explicit version is provided, see if there is a conflict
// if the current version is -1, means we did not find anything, and
// a version is provided, so we do expect to find a doc under that version
// this is important, since we don't allow to preset a version in order to handle deletes
if (currentVersion == -1) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), -1, expectedVersion);
} else if (expectedVersion != currentVersion) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1;
} else { // if (index.origin() == Operation.Origin.REPLICA) {
if (currentVersion != -2) { // -2 means we don't have a version, so ignore...
// if it does not exists, and its considered the first index operation (replicas are 1 of)
// then nothing to do
if (!(currentVersion == -1 && index.version() == 1)) {
// with replicas, we only check for previous version, we allow to set a future version
if (expectedVersion <= currentVersion) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
}
// replicas already hold the "future" version
updatedVersion = index.version();
}
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false));
uidField.version(updatedVersion);
index.version(updatedVersion);
if (currentVersion == -1) {
// document does not exists, we can optimize for create
writer.addDocument(index.doc(), index.analyzer());
} else {
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
}
translog.add(new Translog.Index(index));
}
}
}
@Override public void delete(Delete delete) throws EngineException {
rwl.readLock().lock();
try {
@ -294,8 +451,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (writer == null) {
throw new EngineClosedException(shardId);
}
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
innerDelete(delete, writer);
dirty = true;
if (delete.refresh()) {
refresh(new Refresh(false));
@ -307,6 +463,69 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
}
private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
synchronized (dirtyLock(delete.uid())) {
if (delete.origin() == Operation.Origin.RECOVERY) {
// update the version with the exact version from recovery, assuming we have it
if (delete.version() != 0) {
versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true));
}
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
} else {
long currentVersion;
VersionValue versionValue = versionMap.get(delete.uid().text());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(delete.uid());
} else {
currentVersion = versionValue.version();
}
long updatedVersion;
if (delete.origin() == Operation.Origin.PRIMARY) {
if (delete.version() != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore...
// an explicit version is provided, see if there is a conflict
// if the current version is -1, means we did not find anything, and
// a version is provided, so we do expect to find a doc under that version
if (currentVersion == -1) {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), -1, delete.version());
} else if (delete.version() != currentVersion) {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version());
}
}
updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1;
} else { // if (delete.origin() == Operation.Origin.REPLICA) {
// on replica, the version is the future value expected (returned from the operation on the primary)
if (currentVersion != -2) { // -2 means we don't have a version in the index, ignore
// only check if we have a version for it, otherwise, ignore (see later)
if (currentVersion != -1) {
// with replicas, we only check for previous version, we allow to set a future version
if (delete.version() <= currentVersion) {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, delete.version());
}
}
}
// replicas already hold the "future" version
updatedVersion = delete.version();
}
if (currentVersion == -1) {
// if the doc does not exists, just update with doc 0
delete.version(0).notFound(true);
} else if (versionValue != null && versionValue.delete()) {
// if its a delete on delete and we have the current delete version, return it
delete.version(versionValue.version()).notFound(true);
} else {
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true));
delete.version(updatedVersion);
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
}
}
}
}
@Override public void delete(DeleteByQuery delete) throws EngineException {
rwl.readLock().lock();
try {
@ -437,6 +656,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
throw new FlushFailedEngineException(shardId, e);
}
}
versionMap.clear();
dirty = true; // force a refresh
refresh(new Refresh(true));
} finally {
rwl.writeLock().unlock();
}
@ -582,6 +804,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
closed = true;
rwl.writeLock().lock();
this.versionMap.clear();
try {
if (nrtResource != null) {
this.nrtResource.forceClose();
@ -602,6 +825,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
}
private Object dirtyLock(Term uid) {
return dirtyLocks[Math.abs(uid.hashCode()) % dirtyLocks.length];
}
private long loadCurrentVersionFromIndex(Term uid) {
// no version, get the version from the index
Searcher searcher = searcher();
try {
return UidField.loadVersion(searcher.reader(), uid);
} finally {
searcher.release();
}
}
private IndexWriter createWriter() throws IOException {
IndexWriter indexWriter = null;
try {
@ -663,4 +900,22 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
return true;
}
}
static class VersionValue {
private long version;
private final boolean delete;
VersionValue(long version, boolean delete) {
this.version = version;
this.delete = delete;
}
public long version() {
return version;
}
public boolean delete() {
return delete;
}
}
}

View File

@ -34,6 +34,8 @@ public class SourceToParse {
private String id;
private long version;
private String routing;
private String parentId;
@ -64,6 +66,15 @@ public class SourceToParse {
return this;
}
public long version() {
return this.version;
}
public SourceToParse version(long version) {
this.version = version;
return this;
}
public String parent() {
return this.parentId;
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.index.Term;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MergeMappingException;
@ -41,7 +42,7 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements org.elas
public static final String NAME = org.elasticsearch.index.mapper.UidFieldMapper.NAME;
public static final Field.Index INDEX = Field.Index.NOT_ANALYZED;
public static final boolean OMIT_NORMS = true;
public static final boolean OMIT_TERM_FREQ_AND_POSITIONS = true;
public static final boolean OMIT_TERM_FREQ_AND_POSITIONS = false; // we store payload
}
public static class Builder extends XContentMapper.Builder<Builder, UidFieldMapper> {
@ -71,12 +72,12 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements org.elas
Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
}
@Override protected Field parseCreateField(ParseContext context) throws IOException {
@Override protected Fieldable parseCreateField(ParseContext context) throws IOException {
if (context.id() == null) {
throw new MapperParsingException("No id found while parsing the content source");
}
context.uid(Uid.createUid(context.stringBuilder(), context.type(), context.id()));
return new Field(names.indexName(), context.uid(), store, index);
return new UidField(names().indexName(), context.uid(), 0); // version get updated by the engine
}
@Override public Uid value(Fieldable field) {

View File

@ -33,7 +33,7 @@ public class IndexShardException extends IndexException {
}
public IndexShardException(ShardId shardId, String msg, Throwable cause) {
super(shardId == null ? null : shardId.index(), false, "[" + shardId == null ? "_na" : shardId.id() + "] " + msg, cause);
super(shardId == null ? null : shardId.index(), false, "[" + (shardId == null ? "_na" : shardId.id()) + "] " + msg, cause);
this.shardId = shardId;
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.shard.service;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -56,12 +55,10 @@ public interface IndexShard extends IndexShardComponent {
ParsedDocument index(Engine.Index index) throws ElasticSearchException;
Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException;
Engine.Delete prepareDelete(String type, String id, long version) throws ElasticSearchException;
void delete(Engine.Delete delete) throws ElasticSearchException;
void delete(Term uid) throws ElasticSearchException;
EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException;
void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.shard.service;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.ThreadInterruptedException;
@ -40,10 +39,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService;
@ -213,7 +209,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException {
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source);
return new Engine.Create(doc);
return new Engine.Create(docMapper.uidMapper().term(doc.uid()), doc).version(source.version());
}
@Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException {
@ -228,7 +224,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public Engine.Index prepareIndex(SourceToParse source) throws ElasticSearchException {
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source);
return new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc);
return new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc).version(source.version());
}
@Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
@ -240,13 +236,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return index.parsedDoc();
}
@Override public Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException {
@Override public Engine.Delete prepareDelete(String type, String id, long version) throws ElasticSearchException {
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
return new Engine.Delete(docMapper.uidMapper().term(type, id));
}
@Override public void delete(Term uid) {
delete(new Engine.Delete(uid));
return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id)).version(version);
}
@Override public void delete(Engine.Delete delete) throws ElasticSearchException {
@ -453,16 +445,20 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
case CREATE:
Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent())));
.routing(create.routing()).parent(create.parent())).version(create.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent())));
.routing(index.routing()).parent(index.parent())).version(index.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
engine.delete(new Engine.Delete(delete.uid()));
Uid uid = Uid.createUid(delete.uid().text());
engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid()).version(delete.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;

View File

@ -195,6 +195,7 @@ public interface Translog extends IndexShardComponent {
private byte[] source;
private String routing;
private String parent;
private long version;
public Create() {
}
@ -203,6 +204,7 @@ public interface Translog extends IndexShardComponent {
this(create.type(), create.id(), create.source());
this.routing = create.routing();
this.parent = create.parent();
this.version = create.version();
}
public Create(String type, String id, byte[] source) {
@ -239,6 +241,10 @@ public interface Translog extends IndexShardComponent {
return this.parent;
}
public long version() {
return this.version;
}
@Override public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
@ -255,10 +261,13 @@ public interface Translog extends IndexShardComponent {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(2); // version
out.writeVInt(3); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(source.length);
@ -275,12 +284,14 @@ public interface Translog extends IndexShardComponent {
out.writeBoolean(true);
out.writeUTF(parent);
}
out.writeLong(version);
}
}
static class Index implements Operation {
private String id;
private String type;
private long version;
private byte[] source;
private String routing;
private String parent;
@ -292,6 +303,7 @@ public interface Translog extends IndexShardComponent {
this(index.type(), index.id(), index.source());
this.routing = index.routing();
this.parent = index.parent();
this.version = index.version();
}
public Index(String type, String id, byte[] source) {
@ -328,6 +340,10 @@ public interface Translog extends IndexShardComponent {
return this.source;
}
public long version() {
return this.version;
}
@Override public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
@ -344,10 +360,13 @@ public interface Translog extends IndexShardComponent {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(2); // version
out.writeVInt(3); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(source.length);
@ -364,17 +383,20 @@ public interface Translog extends IndexShardComponent {
out.writeBoolean(true);
out.writeUTF(parent);
}
out.writeLong(version);
}
}
static class Delete implements Operation {
private Term uid;
private long version;
public Delete() {
}
public Delete(Engine.Delete delete) {
this(delete.uid());
this.version = delete.version();
}
public Delete(Term uid) {
@ -393,15 +415,23 @@ public interface Translog extends IndexShardComponent {
return this.uid;
}
public long version() {
return this.version;
}
@Override public void readFrom(StreamInput in) throws IOException {
in.readVInt(); // version
int version = in.readVInt(); // version
uid = new Term(in.readUTF(), in.readUTF());
if (version >= 1) {
this.version = in.readLong();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(0); // version
out.writeVInt(1); // version
out.writeUTF(uid.field());
out.writeUTF(uid.text());
out.writeLong(version);
}
}

View File

@ -78,6 +78,8 @@ public interface RestRequest extends ToXContent.Params {
int paramAsInt(String key, int defaultValue);
long paramAsLong(String key, long defaultValue);
boolean paramAsBoolean(String key, boolean defaultValue);
Boolean paramAsBoolean(String key, Boolean defaultValue);

View File

@ -95,7 +95,10 @@ public class RestBulkAction extends BaseRestHandler {
builder.startObject(itemResponse.opType());
builder.field(Fields._INDEX, itemResponse.index());
builder.field(Fields._TYPE, itemResponse.type());
builder.field(Fields._ID, itemResponse.id());
long version = itemResponse.version();
if (version != -1) {
builder.field(Fields._VERSION, itemResponse.version());
}
if (itemResponse.failed()) {
builder.field(Fields.ERROR, itemResponse.failure().message());
} else {
@ -130,6 +133,7 @@ public class RestBulkAction extends BaseRestHandler {
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString ERROR = new XContentBuilderString("error");
static final XContentBuilderString OK = new XContentBuilderString("ok");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
}
}

View File

@ -28,11 +28,15 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import static org.elasticsearch.ExceptionsHelper.*;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
@ -51,6 +55,7 @@ public class RestDeleteAction extends BaseRestHandler {
deleteRequest.routing(request.param("routing"));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
deleteRequest.refresh(request.paramAsBoolean("refresh", deleteRequest.refresh()));
deleteRequest.version(RestActions.parseVersion(request));
// we just send a response, no need to fork
deleteRequest.listenerThreaded(false);
// we don't spawn, then fork if local
@ -70,24 +75,44 @@ public class RestDeleteAction extends BaseRestHandler {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject()
.field("ok", true)
.field("_index", result.index())
.field("_type", result.type())
.field("_id", result.id())
.field(Fields.OK, true)
.field(Fields.FOUND, !result.notFound())
.field(Fields._INDEX, result.index())
.field(Fields._TYPE, result.type())
.field(Fields._ID, result.id())
.field(Fields._VERSION, result.version())
.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
RestResponse.Status status = OK;
if (result.notFound()) {
status = NOT_FOUND;
}
channel.sendResponse(new XContentRestResponse(request, status, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
Throwable t = unwrapCause(e);
RestResponse.Status status = RestResponse.Status.INTERNAL_SERVER_ERROR;
if (t instanceof VersionConflictEngineException) {
status = RestResponse.Status.CONFLICT;
}
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
channel.sendResponse(new XContentThrowableRestResponse(request, status, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
static final class Fields {
static final XContentBuilderString OK = new XContentBuilderString("ok");
static final XContentBuilderString FOUND = new XContentBuilderString("found");
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
}
}

View File

@ -29,11 +29,15 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.DocumentAlreadyExistsEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import static org.elasticsearch.ExceptionsHelper.*;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
@ -65,6 +69,7 @@ public class RestIndexAction extends BaseRestHandler {
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
indexRequest.version(RestActions.parseVersion(request));
String sOpType = request.param("op_type");
if (sOpType != null) {
if ("index".equals(sOpType)) {
@ -102,6 +107,7 @@ public class RestIndexAction extends BaseRestHandler {
.field(Fields._INDEX, result.index())
.field(Fields._TYPE, result.type())
.field(Fields._ID, result.id())
.field(Fields._VERSION, result.version())
.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Exception e) {
@ -110,8 +116,15 @@ public class RestIndexAction extends BaseRestHandler {
}
@Override public void onFailure(Throwable e) {
Throwable t = unwrapCause(e);
RestResponse.Status status = RestResponse.Status.INTERNAL_SERVER_ERROR;
if (t instanceof VersionConflictEngineException) {
status = RestResponse.Status.CONFLICT;
} else if (t instanceof DocumentAlreadyExistsEngineException) {
status = RestResponse.Status.CONFLICT;
}
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
channel.sendResponse(new XContentThrowableRestResponse(request, status, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
@ -124,6 +137,7 @@ public class RestIndexAction extends BaseRestHandler {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
}
}

View File

@ -41,6 +41,17 @@ public class RestActions {
public final static Pattern nodesIdsPattern = Pattern.compile(",");
public final static Pattern genericPattern = Pattern.compile(",");
public static long parseVersion(RestRequest request) {
if (request.hasParam("version")) {
return request.paramAsLong("version", 0);
}
String ifMatch = request.header("If-Match");
if (ifMatch != null) {
return Long.parseLong(ifMatch);
}
return 0;
}
public static void buildBroadcastShardsHeader(XContentBuilder builder, BroadcastOperationResponse response) throws IOException {
builder.startObject("_shards");
builder.field("total", response.totalShards());

View File

@ -61,6 +61,18 @@ public abstract class AbstractRestRequest implements RestRequest {
}
}
@Override public long paramAsLong(String key, long defaultValue) {
String sValue = param(key);
if (sValue == null) {
return defaultValue;
}
try {
return Long.parseLong(sValue);
} catch (NumberFormatException e) {
throw new ElasticSearchIllegalArgumentException("Failed to parse int parameter [" + key + "] with value [" + sValue + "]", e);
}
}
@Override public boolean paramAsBoolean(String key, boolean defaultValue) {
return Booleans.parseBoolean(param(key), defaultValue);
}

View File

@ -75,6 +75,16 @@ public interface SearchHit extends Streamable, ToXContent, Iterable<SearchHitFie
*/
String getType();
/**
* The version of the hit.
*/
long version();
/**
* The version of the hit.
*/
long getVersion();
/**
* The source of the document (can be <tt>null</tt>).
*/

View File

@ -23,8 +23,10 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchParseElement;
@ -80,7 +82,13 @@ public class FetchPhase implements SearchPhase {
byte[] source = extractSource(doc, documentMapper);
InternalSearchHit searchHit = new InternalSearchHit(docId, uid.id(), uid.type(), source, null);
// get the version
long version = UidField.loadVersion(context.searcher().getIndexReader(), new Term(UidFieldMapper.NAME, doc.get(UidFieldMapper.NAME)));
if (version < 0) {
version = -1;
}
InternalSearchHit searchHit = new InternalSearchHit(docId, uid.id(), uid.type(), version, source, null);
hits[index] = searchHit;
for (Object oField : doc.getFields()) {

View File

@ -62,6 +62,8 @@ public class InternalSearchHit implements SearchHit {
private String type;
private long version = -1;
private byte[] source;
private Map<String, SearchHitField> fields = ImmutableMap.of();
@ -82,10 +84,11 @@ public class InternalSearchHit implements SearchHit {
}
public InternalSearchHit(int docId, String id, String type, byte[] source, Map<String, SearchHitField> fields) {
public InternalSearchHit(int docId, String id, String type, long version, byte[] source, Map<String, SearchHitField> fields) {
this.docId = docId;
this.id = id;
this.type = type;
this.version = version;
this.source = source;
this.fields = fields;
}
@ -110,6 +113,14 @@ public class InternalSearchHit implements SearchHit {
return score();
}
@Override public long version() {
return this.version;
}
@Override public long getVersion() {
return this.version;
}
@Override public String index() {
return shard.index();
}
@ -279,6 +290,7 @@ public class InternalSearchHit implements SearchHit {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString _SCORE = new XContentBuilderString("_score");
static final XContentBuilderString FIELDS = new XContentBuilderString("fields");
static final XContentBuilderString HIGHLIGHT = new XContentBuilderString("highlight");
@ -295,8 +307,11 @@ public class InternalSearchHit implements SearchHit {
builder.field(Fields._INDEX, shard.index());
// builder.field("_shard", shard.shardId());
// builder.field("_node", shard.nodeId());
builder.field(Fields._TYPE, type());
builder.field(Fields._ID, id());
builder.field(Fields._TYPE, type);
builder.field(Fields._ID, id);
if (version != -1) {
builder.field(Fields._VERSION, version);
}
if (Float.isNaN(score)) {
builder.nullField(Fields._SCORE);
} else {
@ -390,6 +405,7 @@ public class InternalSearchHit implements SearchHit {
score = in.readFloat();
id = in.readUTF();
type = in.readUTF();
version = in.readLong();
int size = in.readVInt();
if (size > 0) {
source = new byte[size];
@ -518,6 +534,7 @@ public class InternalSearchHit implements SearchHit {
out.writeFloat(score);
out.writeUTF(id);
out.writeUTF(type);
out.writeLong(version);
if (source == null) {
out.writeVInt(0);
} else {

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.common.lucene.Lucene;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class UidFieldTests {
@Test public void testUidField() throws Exception {
IndexWriter writer = new IndexWriter(new RAMDirectory(), Lucene.STANDARD_ANALYZER, IndexWriter.MaxFieldLength.UNLIMITED);
IndexReader reader = writer.getReader();
assertThat(UidField.loadVersion(reader, new Term("_uid", "1")), equalTo(-1l));
Document doc = new Document();
doc.add(new Field("_uid", "1", Field.Store.YES, Field.Index.NOT_ANALYZED));
writer.addDocument(doc);
reader = writer.getReader();
assertThat(UidField.loadVersion(reader, new Term("_uid", "1")), equalTo(-2l));
assertThat(UidField.loadDocIdAndVersion(reader, new Term("_uid", "1")).version, equalTo(-2l));
doc = new Document();
doc.add(new UidField("_uid", "1", 1));
writer.updateDocument(new Term("_uid", "1"), doc);
reader = writer.getReader();
assertThat(UidField.loadVersion(reader, new Term("_uid", "1")), equalTo(1l));
assertThat(UidField.loadDocIdAndVersion(reader, new Term("_uid", "1")).version, equalTo(1l));
doc = new Document();
doc.add(new UidField("_uid", "1", 2));
writer.updateDocument(new Term("_uid", "1"), doc);
reader = writer.getReader();
assertThat(UidField.loadVersion(reader, new Term("_uid", "1")), equalTo(2l));
assertThat(UidField.loadDocIdAndVersion(reader, new Term("_uid", "1")).version, equalTo(2l));
}
}

View File

@ -53,6 +53,7 @@ import java.util.concurrent.Future;
import static org.elasticsearch.common.lucene.DocumentBuilder.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.index.deletionpolicy.SnapshotIndexCommitExistsMatcher.*;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.*;
import static org.elasticsearch.index.engine.EngineSearcherTotalHitsMatcher.*;
import static org.elasticsearch.index.translog.TranslogSizeMatcher.*;
import static org.hamcrest.MatcherAssert.*;
@ -66,17 +67,26 @@ public abstract class AbstractSimpleEngineTests {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
private Store store;
private Store storeReplica;
private Engine engine;
protected Engine engine;
protected Engine replicaEngine;
@BeforeMethod public void setUp() throws Exception {
store = createStore();
store.deleteContent();
engine = createEngine(store);
storeReplica = createStoreReplica();
storeReplica.deleteContent();
engine = createEngine(store, createTranslog());
engine.start();
replicaEngine = createEngine(storeReplica, createTranslogReplica());
replicaEngine.start();
}
@AfterMethod public void tearDown() throws Exception {
replicaEngine.close();
storeReplica.close();
engine.close();
store.close();
}
@ -85,8 +95,16 @@ public abstract class AbstractSimpleEngineTests {
return new RamStore(shardId, EMPTY_SETTINGS, null);
}
protected Store createStoreReplica() throws IOException {
return new RamStore(shardId, EMPTY_SETTINGS, null);
}
protected Translog createTranslog() {
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false);
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog/primary"), false);
}
protected Translog createTranslogReplica() {
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog/replica"), false);
}
protected IndexDeletionPolicy createIndexDeletionPolicy() {
@ -105,11 +123,11 @@ public abstract class AbstractSimpleEngineTests {
return new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS);
}
protected abstract Engine createEngine(Store store);
protected abstract Engine createEngine(Store store, Translog translog);
private static final byte[] B_1 = new byte[]{1};
private static final byte[] B_2 = new byte[]{2};
private static final byte[] B_3 = new byte[]{3};
protected static final byte[] B_1 = new byte[]{1};
protected static final byte[] B_2 = new byte[]{2};
protected static final byte[] B_3 = new byte[]{3};
@Test public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = engine.searcher();
@ -118,8 +136,8 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(newUid("1"), doc));
// its not there...
searchResult = engine.searcher();
@ -137,7 +155,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// now do an update
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.index(new Engine.Index(newUid("1"), doc));
// its not updated yet...
@ -157,7 +175,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// now delete
engine.delete(new Engine.Delete(newUid("1")));
engine.delete(new Engine.Delete("test", "1", newUid("1")));
// its not deleted yet
searchResult = engine.searcher();
@ -176,8 +194,8 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// add it back
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(newUid("1"), doc));
// its not there...
searchResult = engine.searcher();
@ -201,7 +219,7 @@ public abstract class AbstractSimpleEngineTests {
// make sure we can still work with the engine
// now do an update
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.index(new Engine.Index(newUid("1"), doc));
// its not updated yet...
@ -229,15 +247,15 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
List<Engine.Operation> ops = Lists.newArrayList();
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "1_test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ops.add(new Engine.Create(doc));
doc = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "2_test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ops.add(new Engine.Create(doc));
doc = new ParsedDocument("3", "3", "test", null, doc().add(field("_uid", "3")).add(field("value", "3_test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ops.add(new Engine.Create(doc));
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "1_test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "1_test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ops.add(new Engine.Create(newUid("1"), doc));
doc = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "2_test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ops.add(new Engine.Create(newUid("2"), doc));
doc = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "3_test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ops.add(new Engine.Create(newUid("3"), doc));
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "1_test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ops.add(new Engine.Index(newUid("1"), doc));
ops.add(new Engine.Delete(newUid("2")));
ops.add(new Engine.Delete("test", "2", newUid("2")));
EngineException[] failures = engine.bulk(new Engine.Bulk(ops.toArray(new Engine.Operation[ops.size()])));
assertThat(failures, nullValue());
@ -246,9 +264,9 @@ public abstract class AbstractSimpleEngineTests {
searchResult = engine.searcher();
assertThat(searchResult, engineSearcherTotalHits(2));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "1")), 1));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "2")), 0));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "3")), 1));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(newUid("1")), 1));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(newUid("2")), 0));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(newUid("3")), 1));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "1_test")), 0));
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "1_test1")), 1));
@ -261,8 +279,8 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(newUid("1"), doc));
// its not there...
searchResult = engine.searcher();
@ -280,7 +298,7 @@ public abstract class AbstractSimpleEngineTests {
// don't release the search result yet...
// delete, refresh and do a new search, it should not be there
engine.delete(new Engine.Delete(newUid("1")));
engine.delete(new Engine.Delete("test", "1", newUid("1")));
engine.refresh(new Engine.Refresh(true));
Engine.Searcher updateSearchResult = engine.searcher();
assertThat(updateSearchResult, engineSearcherTotalHits(0));
@ -294,8 +312,8 @@ public abstract class AbstractSimpleEngineTests {
@Test public void testSimpleSnapshot() throws Exception {
// create a document
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc1));
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(newUid("1"), doc1));
final ExecutorService executorService = Executors.newCachedThreadPool();
@ -310,11 +328,11 @@ public abstract class AbstractSimpleEngineTests {
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override public Object call() throws Exception {
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(doc2));
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(newUid("2"), doc2));
engine.flush(new Engine.Flush());
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(doc3));
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(newUid("3"), doc3));
return null;
}
});
@ -348,8 +366,8 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testSimpleRecover() throws Exception {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(newUid("1"), doc));
engine.flush(new Engine.Flush());
engine.recover(new Engine.RecoveryHandler() {
@ -389,11 +407,11 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc1));
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(newUid("1"), doc1));
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(doc2));
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(newUid("2"), doc2));
engine.recover(new Engine.RecoveryHandler() {
@Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
@ -416,11 +434,11 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc1));
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(newUid("1"), doc1));
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(doc2));
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(newUid("2"), doc2));
engine.recover(new Engine.RecoveryHandler() {
@Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
@ -433,8 +451,8 @@ public abstract class AbstractSimpleEngineTests {
assertThat(create.source(), equalTo(B_2));
// add for phase3
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(doc3));
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(newUid("3"), doc3));
}
@Override public void phase3(Translog.Snapshot snapshot) throws EngineException {
@ -449,7 +467,304 @@ public abstract class AbstractSimpleEngineTests {
engine.close();
}
private Term newUid(String id) {
@Test public void testVersioningNewCreate() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(newUid("1"), doc);
engine.create(create);
assertThat(create.version(), equalTo(1l));
create = new Engine.Create(newUid("1"), doc).version(create.version()).origin(REPLICA);
replicaEngine.create(create);
assertThat(create.version(), equalTo(1l));
}
@Test public void testVersioningNewIndex() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
index = new Engine.Index(newUid("1"), doc).version(index.version()).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(1l));
}
@Test public void testVersioningIndexConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2l));
index = new Engine.Index(newUid("1"), doc).version(1l);
try {
engine.index(index);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// future versions should not work as well
index = new Engine.Index(newUid("1"), doc).version(3l);
try {
engine.index(index);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
}
@Test public void testVersioningIndexConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2l));
engine.flush(new Engine.Flush());
index = new Engine.Index(newUid("1"), doc).version(1l);
try {
engine.index(index);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// future versions should not work as well
index = new Engine.Index(newUid("1"), doc).version(3l);
try {
engine.index(index);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
}
@Test public void testVersioningDeleteConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2l));
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")).version(1l);
try {
engine.delete(delete);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// future versions should not work as well
delete = new Engine.Delete("test", "1", newUid("1")).version(3l);
try {
engine.delete(delete);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// now actually delete
delete = new Engine.Delete("test", "1", newUid("1")).version(2l);
engine.delete(delete);
assertThat(delete.version(), equalTo(3l));
// now check if we can index to a delete doc with version
index = new Engine.Index(newUid("1"), doc).version(2l);
try {
engine.index(index);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// we shouldn't be able to create as well
Engine.Create create = new Engine.Create(newUid("1"), doc).version(2l);
try {
engine.create(create);
} catch (VersionConflictEngineException e) {
// all is well
}
}
@Test public void testVersioningDeleteConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2l));
engine.flush(new Engine.Flush());
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")).version(1l);
try {
engine.delete(delete);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// future versions should not work as well
delete = new Engine.Delete("test", "1", newUid("1")).version(3l);
try {
engine.delete(delete);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
engine.flush(new Engine.Flush());
// now actually delete
delete = new Engine.Delete("test", "1", newUid("1")).version(2l);
engine.delete(delete);
assertThat(delete.version(), equalTo(3l));
engine.flush(new Engine.Flush());
// now check if we can index to a delete doc with version
index = new Engine.Index(newUid("1"), doc).version(2l);
try {
engine.index(index);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// we shouldn't be able to create as well
Engine.Create create = new Engine.Create(newUid("1"), doc).version(2l);
try {
engine.create(create);
} catch (VersionConflictEngineException e) {
// all is well
}
}
@Test public void testVersioningCreateExistsException() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(newUid("1"), doc);
engine.create(create);
assertThat(create.version(), equalTo(1l));
create = new Engine.Create(newUid("1"), doc);
try {
engine.create(create);
assert false;
} catch (DocumentAlreadyExistsEngineException e) {
// all is well
}
}
@Test public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(newUid("1"), doc);
engine.create(create);
assertThat(create.version(), equalTo(1l));
engine.flush(new Engine.Flush());
create = new Engine.Create(newUid("1"), doc);
try {
engine.create(create);
assert false;
} catch (DocumentAlreadyExistsEngineException e) {
// all is well
}
}
@Test public void testVersioningReplicaConflict1() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2l));
// apply the second index to the replica, should work fine
index = new Engine.Index(newUid("1"), doc).version(2l).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2l));
// now, the old one should not work
index = new Engine.Index(newUid("1"), doc).version(1l).origin(REPLICA);
try {
replicaEngine.index(index);
assert false;
} catch (VersionConflictEngineException e) {
// all is well
}
// second version on replica should fail as well
try {
index = new Engine.Index(newUid("1"), doc).version(2l).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2l));
} catch (VersionConflictEngineException e) {
// all is well
}
}
@Test public void testVersioningReplicaConflict2() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
// apply the first index to the replica, should work fine
index = new Engine.Index(newUid("1"), doc).version(1l).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(1l));
// index it again
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2l));
// now delete it
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
engine.delete(delete);
assertThat(delete.version(), equalTo(3l));
// apply the delete on the replica (skipping the second index)
delete = new Engine.Delete("test", "1", newUid("1")).version(3l).origin(REPLICA);
replicaEngine.delete(delete);
assertThat(delete.version(), equalTo(3l));
// second time delete with same version should fail
try {
delete = new Engine.Delete("test", "1", newUid("1")).version(3l).origin(REPLICA);
replicaEngine.delete(delete);
assertThat(delete.version(), equalTo(3l));
} catch (VersionConflictEngineException e) {
// all is well
}
// now do the second index on the replica, it should fail
try {
index = new Engine.Index(newUid("1"), doc).version(2l).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2l));
} catch (VersionConflictEngineException e) {
// all is well
}
}
protected Term newUid(String id) {
return new Term("_uid", id);
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.index.engine.AbstractSimpleEngineTests;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
@ -32,8 +33,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
*/
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
protected Engine createEngine(Store store) {
return new RobinEngine(shardId, EMPTY_SETTINGS, store, createSnapshotDeletionPolicy(), createTranslog(), createMergePolicy(), createMergeScheduler(),
protected Engine createEngine(Store store, Translog translog) {
return new RobinEngine(shardId, EMPTY_SETTINGS, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()));
}
}