Simplify shard-level bulk operation execution
This commit refactors execution of shard-level bulk operations to use the same failure handling for index, delete and update operations.
This commit is contained in:
parent
80ca78479f
commit
cc993de996
|
@ -21,7 +21,8 @@ package org.elasticsearch.action.bulk;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.DocumentRequest;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.delete.DeleteResponse;
|
import org.elasticsearch.action.delete.DeleteResponse;
|
||||||
import org.elasticsearch.action.delete.TransportDeleteAction;
|
import org.elasticsearch.action.delete.TransportDeleteAction;
|
||||||
|
@ -29,7 +30,6 @@ import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.index.TransportIndexAction;
|
import org.elasticsearch.action.index.TransportIndexAction;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
|
||||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
|
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
|
||||||
import org.elasticsearch.action.update.UpdateHelper;
|
import org.elasticsearch.action.update.UpdateHelper;
|
||||||
|
@ -66,9 +66,7 @@ import java.util.Map;
|
||||||
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
|
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
|
||||||
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
|
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
|
||||||
|
|
||||||
/**
|
/** Performs shard-level bulk (index, delete or update) operations */
|
||||||
* Performs the index operation.
|
|
||||||
*/
|
|
||||||
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> {
|
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> {
|
||||||
|
|
||||||
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
|
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
|
||||||
|
@ -114,8 +112,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
VersionType[] preVersionTypes = new VersionType[request.items().length];
|
VersionType[] preVersionTypes = new VersionType[request.items().length];
|
||||||
Translog.Location location = null;
|
Translog.Location location = null;
|
||||||
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
||||||
BulkItemRequest item = request.items()[requestIndex];
|
location = executeBulkItemRequest(metaData, indexShard, request, preVersions, preVersionTypes, location, requestIndex);
|
||||||
location = handleItem(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
||||||
|
@ -127,202 +124,84 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
return new WriteResult<>(response, location);
|
return new WriteResult<>(response, location);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Translog.Location handleItem(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
/** Executes bulk item requests and handles request execution exceptions */
|
||||||
if (item.request() instanceof IndexRequest) {
|
private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard indexShard,
|
||||||
location = index(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
BulkShardRequest request,
|
||||||
} else if (item.request() instanceof DeleteRequest) {
|
long[] preVersions, VersionType[] preVersionTypes,
|
||||||
location = delete(request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
Translog.Location location, int requestIndex) {
|
||||||
} else if (item.request() instanceof UpdateRequest) {
|
preVersions[requestIndex] = request.items()[requestIndex].request().version();
|
||||||
Tuple<Translog.Location, BulkItemRequest> tuple = update(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
preVersionTypes[requestIndex] = request.items()[requestIndex].request().versionType();
|
||||||
location = tuple.v1();
|
DocumentRequest.OpType opType = request.items()[requestIndex].request().opType();
|
||||||
item = tuple.v2();
|
try {
|
||||||
} else {
|
WriteResult<? extends DocWriteResponse> writeResult = innerExecuteBulkItemRequest(metaData, indexShard,
|
||||||
throw new IllegalStateException("Unexpected index operation: " + item.request());
|
request, requestIndex);
|
||||||
|
if (writeResult.getResponse().getResult() != DocWriteResponse.Result.NOOP) {
|
||||||
|
location = locationToSync(location, writeResult.getLocation());
|
||||||
|
}
|
||||||
|
// update the bulk item request because update request execution can mutate the bulk item request
|
||||||
|
BulkItemRequest item = request.items()[requestIndex];
|
||||||
|
// add the response
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), opType, writeResult.getResponse()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||||
|
if (retryPrimaryException(e)) {
|
||||||
|
// restore updated versions...
|
||||||
|
for (int j = 0; j < requestIndex; j++) {
|
||||||
|
DocumentRequest<?> documentRequest = request.items()[j].request();
|
||||||
|
documentRequest.version(preVersions[j]);
|
||||||
|
documentRequest.versionType(preVersionTypes[j]);
|
||||||
|
}
|
||||||
|
throw (ElasticsearchException) e;
|
||||||
|
}
|
||||||
|
BulkItemRequest item = request.items()[requestIndex];
|
||||||
|
DocumentRequest<?> documentRequest = item.request();
|
||||||
|
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
|
||||||
|
logger.trace("{} failed to execute bulk item ({}) {}", e, request.shardId(),
|
||||||
|
documentRequest.opType().getLowercase(), request);
|
||||||
|
} else {
|
||||||
|
logger.debug("{} failed to execute bulk item ({}) {}", e, request.shardId(),
|
||||||
|
documentRequest.opType().getLowercase(), request);
|
||||||
|
}
|
||||||
|
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
||||||
|
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
||||||
|
// then just use the response we got from the successful execution
|
||||||
|
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
||||||
|
setResponse(item, item.getPrimaryResponse());
|
||||||
|
} else {
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), documentRequest.opType(),
|
||||||
|
new BulkItemResponse.Failure(request.index(), documentRequest.type(), documentRequest.id(), e)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
assert request.items()[requestIndex].getPrimaryResponse() != null;
|
||||||
assert item.getPrimaryResponse() != null;
|
|
||||||
assert preVersionTypes[requestIndex] != null;
|
assert preVersionTypes[requestIndex] != null;
|
||||||
return location;
|
return location;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Translog.Location index(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
private WriteResult<? extends DocWriteResponse> innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard,
|
||||||
IndexRequest indexRequest = (IndexRequest) item.request();
|
BulkShardRequest request, int requestIndex) throws Exception {
|
||||||
preVersions[requestIndex] = indexRequest.version();
|
DocumentRequest<?> itemRequest = request.items()[requestIndex].request();
|
||||||
preVersionTypes[requestIndex] = indexRequest.versionType();
|
switch (itemRequest.opType()) {
|
||||||
try {
|
case CREATE:
|
||||||
WriteResult<IndexResponse> result = shardIndexOperation(request, indexRequest, metaData, indexShard, true);
|
case INDEX:
|
||||||
location = locationToSync(location, result.getLocation());
|
return TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, mappingUpdatedAction);
|
||||||
// add the response
|
case UPDATE:
|
||||||
IndexResponse indexResponse = result.getResponse();
|
int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict();
|
||||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType(), indexResponse));
|
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
|
||||||
} catch (Exception e) {
|
try {
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
return shardUpdateOperation(metaData, indexShard, request, requestIndex, ((UpdateRequest) itemRequest));
|
||||||
if (retryPrimaryException(e)) {
|
} catch (Exception e) {
|
||||||
// restore updated versions...
|
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||||
for (int j = 0; j < requestIndex; j++) {
|
if (attemptCount == maxAttempts // bubble up exception when we run out of attempts
|
||||||
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
|| (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict
|
||||||
}
|
throw e;
|
||||||
throw (ElasticsearchException) e;
|
|
||||||
}
|
|
||||||
logFailure(e, "index", request.shardId(), indexRequest);
|
|
||||||
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
|
||||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
|
||||||
// then just use the response we got from the successful execution
|
|
||||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
|
||||||
setResponse(item, item.getPrimaryResponse());
|
|
||||||
} else {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType(),
|
|
||||||
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return location;
|
|
||||||
}
|
|
||||||
|
|
||||||
private <ReplicationRequestT extends ReplicationRequest<ReplicationRequestT>> void logFailure(Throwable t, String operation, ShardId shardId, ReplicationRequest<ReplicationRequestT> request) {
|
|
||||||
if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
|
|
||||||
logger.trace("{} failed to execute bulk item ({}) {}", t, shardId, operation, request);
|
|
||||||
} else {
|
|
||||||
logger.debug("{} failed to execute bulk item ({}) {}", t, shardId, operation, request);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Translog.Location delete(BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
|
||||||
DeleteRequest deleteRequest = (DeleteRequest) item.request();
|
|
||||||
preVersions[requestIndex] = deleteRequest.version();
|
|
||||||
preVersionTypes[requestIndex] = deleteRequest.versionType();
|
|
||||||
|
|
||||||
try {
|
|
||||||
// add the response
|
|
||||||
final WriteResult<DeleteResponse> writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
|
||||||
DeleteResponse deleteResponse = writeResult.getResponse();
|
|
||||||
location = locationToSync(location, writeResult.getLocation());
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(), deleteResponse));
|
|
||||||
} catch (Exception e) {
|
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
|
||||||
if (retryPrimaryException(e)) {
|
|
||||||
// restore updated versions...
|
|
||||||
for (int j = 0; j < requestIndex; j++) {
|
|
||||||
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
|
||||||
}
|
|
||||||
throw (ElasticsearchException) e;
|
|
||||||
}
|
|
||||||
logFailure(e, "delete", request.shardId(), deleteRequest);
|
|
||||||
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
|
||||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
|
||||||
// then just use the response we got from the successful execution
|
|
||||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
|
||||||
setResponse(item, item.getPrimaryResponse());
|
|
||||||
} else {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(),
|
|
||||||
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return location;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Tuple<Translog.Location, BulkItemRequest> update(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
|
||||||
UpdateRequest updateRequest = (UpdateRequest) item.request();
|
|
||||||
preVersions[requestIndex] = updateRequest.version();
|
|
||||||
preVersionTypes[requestIndex] = updateRequest.versionType();
|
|
||||||
// We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE
|
|
||||||
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
|
|
||||||
UpdateResult updateResult;
|
|
||||||
try {
|
|
||||||
updateResult = shardUpdateOperation(metaData, request, updateRequest, indexShard);
|
|
||||||
} catch (Exception t) {
|
|
||||||
updateResult = new UpdateResult(null, null, false, t, null);
|
|
||||||
}
|
|
||||||
if (updateResult.success()) {
|
|
||||||
if (updateResult.writeResult != null) {
|
|
||||||
location = locationToSync(location, updateResult.writeResult.getLocation());
|
|
||||||
}
|
|
||||||
switch (updateResult.result.getResponseResult()) {
|
|
||||||
case CREATED:
|
|
||||||
case UPDATED:
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
WriteResult<IndexResponse> result = updateResult.writeResult;
|
|
||||||
IndexRequest indexRequest = updateResult.request();
|
|
||||||
BytesReference indexSourceAsBytes = indexRequest.source();
|
|
||||||
// add the response
|
|
||||||
IndexResponse indexResponse = result.getResponse();
|
|
||||||
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
|
|
||||||
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
|
|
||||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
|
||||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
|
||||||
}
|
|
||||||
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResponse));
|
|
||||||
break;
|
|
||||||
case DELETED:
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
|
|
||||||
DeleteResponse response = writeResult.getResponse();
|
|
||||||
DeleteRequest deleteRequest = updateResult.request();
|
|
||||||
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
|
|
||||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
|
||||||
// Replace the update request to the translated delete request to execute on the replica.
|
|
||||||
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResponse));
|
|
||||||
break;
|
|
||||||
case NOOP:
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResult.noopResult));
|
|
||||||
item.setIgnoreOnReplica(); // no need to go to the replica
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new IllegalStateException("Illegal operation " + updateResult.result.getResponseResult());
|
|
||||||
}
|
|
||||||
// NOTE: Breaking out of the retry_on_conflict loop!
|
|
||||||
break;
|
|
||||||
} else if (updateResult.failure()) {
|
|
||||||
Throwable e = updateResult.error;
|
|
||||||
if (updateResult.retry) {
|
|
||||||
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
|
|
||||||
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(),
|
|
||||||
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
|
||||||
if (retryPrimaryException(e)) {
|
|
||||||
// restore updated versions...
|
|
||||||
for (int j = 0; j < requestIndex; j++) {
|
|
||||||
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
|
||||||
}
|
|
||||||
throw (ElasticsearchException) e;
|
|
||||||
}
|
|
||||||
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
|
||||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
|
||||||
// then just use the response we got from the successful execution
|
|
||||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
|
||||||
setResponse(item, item.getPrimaryResponse());
|
|
||||||
} else if (updateResult.result == null) {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
|
|
||||||
} else {
|
|
||||||
switch (updateResult.result.getResponseResult()) {
|
|
||||||
case CREATED:
|
|
||||||
case UPDATED:
|
|
||||||
IndexRequest indexRequest = updateResult.request();
|
|
||||||
logFailure(e, "index", request.shardId(), indexRequest);
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(),
|
|
||||||
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
|
|
||||||
break;
|
|
||||||
case DELETED:
|
|
||||||
DeleteRequest deleteRequest = updateResult.request();
|
|
||||||
logFailure(e, "delete", request.shardId(), deleteRequest);
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(),
|
|
||||||
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new IllegalStateException("Illegal operation " + updateResult.result.getResponseResult());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// NOTE: Breaking out of the retry_on_conflict loop!
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
|
||||||
}
|
case DELETE:
|
||||||
|
return TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard);
|
||||||
|
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
|
||||||
}
|
}
|
||||||
return Tuple.tuple(location, item);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
|
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
|
||||||
|
@ -335,105 +214,48 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private WriteResult<IndexResponse> shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, IndexMetaData metaData,
|
/**
|
||||||
IndexShard indexShard, boolean processed) throws Exception {
|
* Executes update request, doing a get and translating update to a index or delete operation
|
||||||
|
* NOTE: all operations except NOOP, reassigns the bulk item request
|
||||||
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
|
*/
|
||||||
if (!processed) {
|
private WriteResult<? extends DocWriteResponse> shardUpdateOperation(IndexMetaData metaData, IndexShard indexShard,
|
||||||
indexRequest.process(mappingMd, allowIdGeneration, request.index());
|
BulkShardRequest request,
|
||||||
}
|
int requestIndex, UpdateRequest updateRequest)
|
||||||
return TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
|
throws Exception {
|
||||||
}
|
// Todo: capture read version conflicts, missing documents and malformed script errors in the write result due to get request
|
||||||
|
|
||||||
static class UpdateResult {
|
|
||||||
|
|
||||||
final UpdateHelper.Result result;
|
|
||||||
final ActionRequest actionRequest;
|
|
||||||
final boolean retry;
|
|
||||||
final Throwable error;
|
|
||||||
final WriteResult writeResult;
|
|
||||||
final UpdateResponse noopResult;
|
|
||||||
|
|
||||||
UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, boolean retry, Throwable error, WriteResult writeResult) {
|
|
||||||
this.result = result;
|
|
||||||
this.actionRequest = actionRequest;
|
|
||||||
this.retry = retry;
|
|
||||||
this.error = error;
|
|
||||||
this.writeResult = writeResult;
|
|
||||||
this.noopResult = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, WriteResult writeResult) {
|
|
||||||
this.result = result;
|
|
||||||
this.actionRequest = actionRequest;
|
|
||||||
this.writeResult = writeResult;
|
|
||||||
this.retry = false;
|
|
||||||
this.error = null;
|
|
||||||
this.noopResult = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public UpdateResult(UpdateHelper.Result result, UpdateResponse updateResponse) {
|
|
||||||
this.result = result;
|
|
||||||
this.noopResult = updateResponse;
|
|
||||||
this.actionRequest = null;
|
|
||||||
this.writeResult = null;
|
|
||||||
this.retry = false;
|
|
||||||
this.error = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
boolean failure() {
|
|
||||||
return error != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean success() {
|
|
||||||
return noopResult != null || writeResult != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
<T extends ActionRequest> T request() {
|
|
||||||
return (T) actionRequest;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private UpdateResult shardUpdateOperation(IndexMetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
|
|
||||||
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
|
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
|
||||||
switch (translate.getResponseResult()) {
|
switch (translate.getResponseResult()) {
|
||||||
case CREATED:
|
case CREATED:
|
||||||
case UPDATED:
|
case UPDATED:
|
||||||
IndexRequest indexRequest = translate.action();
|
IndexRequest indexRequest = translate.action();
|
||||||
try {
|
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
|
||||||
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, metaData, indexShard, false);
|
indexRequest.process(mappingMd, allowIdGeneration, request.index());
|
||||||
return new UpdateResult(translate, indexRequest, result);
|
WriteResult<IndexResponse> writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
|
||||||
} catch (Exception e) {
|
BytesReference indexSourceAsBytes = indexRequest.source();
|
||||||
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
IndexResponse indexResponse = writeResult.getResponse();
|
||||||
boolean retry = false;
|
UpdateResponse writeUpdateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
|
||||||
if (cause instanceof VersionConflictEngineException) {
|
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
|
||||||
retry = true;
|
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
||||||
}
|
writeUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||||
return new UpdateResult(translate, indexRequest, retry, cause, null);
|
|
||||||
}
|
}
|
||||||
|
// Replace the update request to the translated index request to execute on the replica.
|
||||||
|
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
|
||||||
|
return new WriteResult<>(writeUpdateResponse, writeResult.getLocation());
|
||||||
case DELETED:
|
case DELETED:
|
||||||
DeleteRequest deleteRequest = translate.action();
|
DeleteRequest deleteRequest = translate.action();
|
||||||
try {
|
WriteResult<DeleteResponse> deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
||||||
WriteResult<DeleteResponse> result = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
DeleteResponse response = deleteResult.getResponse();
|
||||||
return new UpdateResult(translate, deleteRequest, result);
|
UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
|
||||||
} catch (Exception e) {
|
deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null));
|
||||||
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
// Replace the update request to the translated delete request to execute on the replica.
|
||||||
boolean retry = false;
|
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
||||||
if (cause instanceof VersionConflictEngineException) {
|
return new WriteResult<>(deleteUpdateResponse, deleteResult.getLocation());
|
||||||
retry = true;
|
|
||||||
}
|
|
||||||
return new UpdateResult(translate, deleteRequest, retry, cause, null);
|
|
||||||
}
|
|
||||||
case NOOP:
|
case NOOP:
|
||||||
UpdateResponse updateResponse = translate.action();
|
BulkItemRequest item = request.items()[requestIndex];
|
||||||
indexShard.noopUpdate(updateRequest.type());
|
indexShard.noopUpdate(updateRequest.type());
|
||||||
return new UpdateResult(translate, updateResponse);
|
item.setIgnoreOnReplica(); // no need to go to the replica
|
||||||
default:
|
return new WriteResult<>(translate.action(), null);
|
||||||
throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
|
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,18 +299,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
return location;
|
return location;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {
|
|
||||||
if (item.request() instanceof IndexRequest) {
|
|
||||||
((IndexRequest) item.request()).version(version).versionType(versionType);
|
|
||||||
} else if (item.request() instanceof DeleteRequest) {
|
|
||||||
((DeleteRequest) item.request()).version(version).versionType();
|
|
||||||
} else if (item.request() instanceof UpdateRequest) {
|
|
||||||
((UpdateRequest) item.request()).version(version).versionType();
|
|
||||||
} else {
|
|
||||||
// log?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
|
private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
|
||||||
/* here we are moving forward in the translog with each operation. Under the hood
|
/* here we are moving forward in the translog with each operation. Under the hood
|
||||||
* this might cross translog files which is ok since from the user perspective
|
* this might cross translog files which is ok since from the user perspective
|
||||||
|
|
Loading…
Reference in New Issue