parent
9e52bcd166
commit
26c1bb36a2
|
@ -30,6 +30,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.index.TransportIndexAction;
|
import org.elasticsearch.action.index.TransportIndexAction;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
|
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||||
import org.elasticsearch.action.update.UpdateHelper;
|
import org.elasticsearch.action.update.UpdateHelper;
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
|
@ -111,185 +112,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
|
||||||
Translog.Location location = null;
|
Translog.Location location = null;
|
||||||
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
||||||
BulkItemRequest item = request.items()[requestIndex];
|
BulkItemRequest item = request.items()[requestIndex];
|
||||||
if (item.request() instanceof IndexRequest) {
|
location = handleItem(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
||||||
IndexRequest indexRequest = (IndexRequest) item.request();
|
|
||||||
preVersions[requestIndex] = indexRequest.version();
|
|
||||||
preVersionTypes[requestIndex] = indexRequest.versionType();
|
|
||||||
try {
|
|
||||||
WriteResult<IndexResponse> result = shardIndexOperation(request, indexRequest, metaData, indexShard, true);
|
|
||||||
location = locationToSync(location, result.location);
|
|
||||||
// add the response
|
|
||||||
IndexResponse indexResponse = result.response();
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
|
|
||||||
} catch (Throwable e) {
|
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
|
||||||
if (retryPrimaryException(e)) {
|
|
||||||
// restore updated versions...
|
|
||||||
for (int j = 0; j < requestIndex; j++) {
|
|
||||||
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
|
||||||
}
|
|
||||||
throw (ElasticsearchException) e;
|
|
||||||
}
|
|
||||||
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
|
|
||||||
logger.trace("{} failed to execute bulk item (index) {}", e, request.shardId(), indexRequest);
|
|
||||||
} else {
|
|
||||||
logger.debug("{} failed to execute bulk item (index) {}", e, request.shardId(), indexRequest);
|
|
||||||
}
|
|
||||||
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
|
||||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
|
||||||
// then just use the response we got from the successful execution
|
|
||||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
|
||||||
setResponse(item, item.getPrimaryResponse());
|
|
||||||
} else {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
|
|
||||||
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (item.request() instanceof DeleteRequest) {
|
|
||||||
DeleteRequest deleteRequest = (DeleteRequest) item.request();
|
|
||||||
preVersions[requestIndex] = deleteRequest.version();
|
|
||||||
preVersionTypes[requestIndex] = deleteRequest.versionType();
|
|
||||||
|
|
||||||
try {
|
|
||||||
// add the response
|
|
||||||
final WriteResult<DeleteResponse> writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
|
||||||
DeleteResponse deleteResponse = writeResult.response();
|
|
||||||
location = locationToSync(location, writeResult.location);
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
|
|
||||||
} catch (Throwable e) {
|
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
|
||||||
if (retryPrimaryException(e)) {
|
|
||||||
// restore updated versions...
|
|
||||||
for (int j = 0; j < requestIndex; j++) {
|
|
||||||
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
|
||||||
}
|
|
||||||
throw (ElasticsearchException) e;
|
|
||||||
}
|
|
||||||
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
|
|
||||||
logger.trace("{} failed to execute bulk item (delete) {}", e, request.shardId(), deleteRequest);
|
|
||||||
} else {
|
|
||||||
logger.debug("{} failed to execute bulk item (delete) {}", e, request.shardId(), deleteRequest);
|
|
||||||
}
|
|
||||||
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
|
||||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
|
||||||
// then just use the response we got from the successful execution
|
|
||||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
|
||||||
setResponse(item, item.getPrimaryResponse());
|
|
||||||
} else {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
|
|
||||||
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (item.request() instanceof UpdateRequest) {
|
|
||||||
UpdateRequest updateRequest = (UpdateRequest) item.request();
|
|
||||||
preVersions[requestIndex] = updateRequest.version();
|
|
||||||
preVersionTypes[requestIndex] = updateRequest.versionType();
|
|
||||||
// We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE
|
|
||||||
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
|
|
||||||
UpdateResult updateResult;
|
|
||||||
try {
|
|
||||||
updateResult = shardUpdateOperation(metaData, request, updateRequest, indexShard);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
updateResult = new UpdateResult(null, null, false, t, null);
|
|
||||||
}
|
|
||||||
if (updateResult.success()) {
|
|
||||||
if (updateResult.writeResult != null) {
|
|
||||||
location = locationToSync(location, updateResult.writeResult.location);
|
|
||||||
}
|
|
||||||
switch (updateResult.result.operation()) {
|
|
||||||
case UPSERT:
|
|
||||||
case INDEX:
|
|
||||||
WriteResult<IndexResponse> result = updateResult.writeResult;
|
|
||||||
IndexRequest indexRequest = updateResult.request();
|
|
||||||
BytesReference indexSourceAsBytes = indexRequest.source();
|
|
||||||
// add the response
|
|
||||||
IndexResponse indexResponse = result.response();
|
|
||||||
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
|
|
||||||
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
|
|
||||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
|
||||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
|
||||||
}
|
|
||||||
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
|
|
||||||
break;
|
|
||||||
case DELETE:
|
|
||||||
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
|
|
||||||
DeleteResponse response = writeResult.response();
|
|
||||||
DeleteRequest deleteRequest = updateResult.request();
|
|
||||||
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
|
|
||||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
|
||||||
// Replace the update request to the translated delete request to execute on the replica.
|
|
||||||
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
|
|
||||||
break;
|
|
||||||
case NONE:
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult));
|
|
||||||
item.setIgnoreOnReplica(); // no need to go to the replica
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// NOTE: Breaking out of the retry_on_conflict loop!
|
|
||||||
break;
|
|
||||||
} else if (updateResult.failure()) {
|
|
||||||
Throwable t = updateResult.error;
|
|
||||||
if (updateResult.retry) {
|
|
||||||
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
|
|
||||||
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
|
|
||||||
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
|
||||||
if (retryPrimaryException(t)) {
|
|
||||||
// restore updated versions...
|
|
||||||
for (int j = 0; j < requestIndex; j++) {
|
|
||||||
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
|
||||||
}
|
|
||||||
throw (ElasticsearchException) t;
|
|
||||||
}
|
|
||||||
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
|
||||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
|
||||||
// then just use the response we got from the successful execution
|
|
||||||
if (item.getPrimaryResponse() != null && isConflictException(t)) {
|
|
||||||
setResponse(item, item.getPrimaryResponse());
|
|
||||||
} else if (updateResult.result == null) {
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
|
|
||||||
} else {
|
|
||||||
switch (updateResult.result.operation()) {
|
|
||||||
case UPSERT:
|
|
||||||
case INDEX:
|
|
||||||
IndexRequest indexRequest = updateResult.request();
|
|
||||||
if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
|
|
||||||
logger.trace("{} failed to execute bulk item (index) {}", t, request.shardId(), indexRequest);
|
|
||||||
} else {
|
|
||||||
logger.debug("{} failed to execute bulk item (index) {}", t, request.shardId(), indexRequest);
|
|
||||||
}
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
|
|
||||||
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t)));
|
|
||||||
break;
|
|
||||||
case DELETE:
|
|
||||||
DeleteRequest deleteRequest = updateResult.request();
|
|
||||||
if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
|
|
||||||
logger.trace("{} failed to execute bulk item (delete) {}", t, request.shardId(), deleteRequest);
|
|
||||||
} else {
|
|
||||||
logger.debug("{} failed to execute bulk item (delete) {}", t, request.shardId(), deleteRequest);
|
|
||||||
}
|
|
||||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
|
|
||||||
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t)));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// NOTE: Breaking out of the retry_on_conflict loop!
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new IllegalStateException("Unexpected index operation: " + item.request());
|
|
||||||
}
|
|
||||||
|
|
||||||
assert item.getPrimaryResponse() != null;
|
|
||||||
assert preVersionTypes[requestIndex] != null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
processAfterWrite(request.refresh(), indexShard, location);
|
processAfterWrite(request.refresh(), indexShard, location);
|
||||||
|
@ -301,6 +124,198 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
|
||||||
return new Tuple<>(new BulkShardResponse(request.shardId(), responses), request);
|
return new Tuple<>(new BulkShardResponse(request.shardId(), responses), request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Translog.Location handleItem(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
||||||
|
if (item.request() instanceof IndexRequest) {
|
||||||
|
location = index(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
||||||
|
} else if (item.request() instanceof DeleteRequest) {
|
||||||
|
location = delete(request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
||||||
|
} else if (item.request() instanceof UpdateRequest) {
|
||||||
|
Tuple<Translog.Location, BulkItemRequest> tuple = update(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
||||||
|
location = tuple.v1();
|
||||||
|
item = tuple.v2();
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException("Unexpected index operation: " + item.request());
|
||||||
|
}
|
||||||
|
|
||||||
|
assert item.getPrimaryResponse() != null;
|
||||||
|
assert preVersionTypes[requestIndex] != null;
|
||||||
|
return location;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Translog.Location index(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
||||||
|
IndexRequest indexRequest = (IndexRequest) item.request();
|
||||||
|
preVersions[requestIndex] = indexRequest.version();
|
||||||
|
preVersionTypes[requestIndex] = indexRequest.versionType();
|
||||||
|
try {
|
||||||
|
WriteResult<IndexResponse> result = shardIndexOperation(request, indexRequest, metaData, indexShard, true);
|
||||||
|
location = locationToSync(location, result.location);
|
||||||
|
// add the response
|
||||||
|
IndexResponse indexResponse = result.response();
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||||
|
if (retryPrimaryException(e)) {
|
||||||
|
// restore updated versions...
|
||||||
|
for (int j = 0; j < requestIndex; j++) {
|
||||||
|
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
||||||
|
}
|
||||||
|
throw (ElasticsearchException) e;
|
||||||
|
}
|
||||||
|
logFailure(e, "index", request.shardId(), indexRequest);
|
||||||
|
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
||||||
|
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
||||||
|
// then just use the response we got from the successful execution
|
||||||
|
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
||||||
|
setResponse(item, item.getPrimaryResponse());
|
||||||
|
} else {
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
|
||||||
|
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return location;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <ReplicationRequestT extends ReplicationRequest<ReplicationRequestT>> void logFailure(Throwable e, String operation, ShardId shardId, ReplicationRequest<ReplicationRequestT> request) {
|
||||||
|
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
|
||||||
|
logger.trace("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
|
||||||
|
} else {
|
||||||
|
logger.debug("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Translog.Location delete(BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
||||||
|
DeleteRequest deleteRequest = (DeleteRequest) item.request();
|
||||||
|
preVersions[requestIndex] = deleteRequest.version();
|
||||||
|
preVersionTypes[requestIndex] = deleteRequest.versionType();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// add the response
|
||||||
|
final WriteResult<DeleteResponse> writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
||||||
|
DeleteResponse deleteResponse = writeResult.response();
|
||||||
|
location = locationToSync(location, writeResult.location);
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||||
|
if (retryPrimaryException(e)) {
|
||||||
|
// restore updated versions...
|
||||||
|
for (int j = 0; j < requestIndex; j++) {
|
||||||
|
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
||||||
|
}
|
||||||
|
throw (ElasticsearchException) e;
|
||||||
|
}
|
||||||
|
logFailure(e, "delete", request.shardId(), deleteRequest);
|
||||||
|
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
||||||
|
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
||||||
|
// then just use the response we got from the successful execution
|
||||||
|
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
||||||
|
setResponse(item, item.getPrimaryResponse());
|
||||||
|
} else {
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
|
||||||
|
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return location;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Tuple<Translog.Location, BulkItemRequest> update(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
|
||||||
|
UpdateRequest updateRequest = (UpdateRequest) item.request();
|
||||||
|
preVersions[requestIndex] = updateRequest.version();
|
||||||
|
preVersionTypes[requestIndex] = updateRequest.versionType();
|
||||||
|
// We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE
|
||||||
|
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
|
||||||
|
UpdateResult updateResult;
|
||||||
|
try {
|
||||||
|
updateResult = shardUpdateOperation(metaData, request, updateRequest, indexShard);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
updateResult = new UpdateResult(null, null, false, t, null);
|
||||||
|
}
|
||||||
|
if (updateResult.success()) {
|
||||||
|
if (updateResult.writeResult != null) {
|
||||||
|
location = locationToSync(location, updateResult.writeResult.location);
|
||||||
|
}
|
||||||
|
switch (updateResult.result.operation()) {
|
||||||
|
case UPSERT:
|
||||||
|
case INDEX:
|
||||||
|
WriteResult<IndexResponse> result = updateResult.writeResult;
|
||||||
|
IndexRequest indexRequest = updateResult.request();
|
||||||
|
BytesReference indexSourceAsBytes = indexRequest.source();
|
||||||
|
// add the response
|
||||||
|
IndexResponse indexResponse = result.response();
|
||||||
|
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
|
||||||
|
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
|
||||||
|
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
||||||
|
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||||
|
}
|
||||||
|
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
|
||||||
|
break;
|
||||||
|
case DELETE:
|
||||||
|
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
|
||||||
|
DeleteResponse response = writeResult.response();
|
||||||
|
DeleteRequest deleteRequest = updateResult.request();
|
||||||
|
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
|
||||||
|
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
||||||
|
// Replace the update request to the translated delete request to execute on the replica.
|
||||||
|
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
|
||||||
|
break;
|
||||||
|
case NONE:
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult));
|
||||||
|
item.setIgnoreOnReplica(); // no need to go to the replica
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// NOTE: Breaking out of the retry_on_conflict loop!
|
||||||
|
break;
|
||||||
|
} else if (updateResult.failure()) {
|
||||||
|
Throwable t = updateResult.error;
|
||||||
|
if (updateResult.retry) {
|
||||||
|
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
|
||||||
|
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
|
||||||
|
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||||
|
if (retryPrimaryException(t)) {
|
||||||
|
// restore updated versions...
|
||||||
|
for (int j = 0; j < requestIndex; j++) {
|
||||||
|
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
|
||||||
|
}
|
||||||
|
throw (ElasticsearchException) t;
|
||||||
|
}
|
||||||
|
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
||||||
|
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
||||||
|
// then just use the response we got from the successful execution
|
||||||
|
if (item.getPrimaryResponse() != null && isConflictException(t)) {
|
||||||
|
setResponse(item, item.getPrimaryResponse());
|
||||||
|
} else if (updateResult.result == null) {
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
|
||||||
|
} else {
|
||||||
|
switch (updateResult.result.operation()) {
|
||||||
|
case UPSERT:
|
||||||
|
case INDEX:
|
||||||
|
IndexRequest indexRequest = updateResult.request();
|
||||||
|
logFailure(t, "index", request.shardId(), indexRequest);
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
|
||||||
|
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t)));
|
||||||
|
break;
|
||||||
|
case DELETE:
|
||||||
|
DeleteRequest deleteRequest = updateResult.request();
|
||||||
|
logFailure(t, "delete", request.shardId(), deleteRequest);
|
||||||
|
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
|
||||||
|
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t)));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// NOTE: Breaking out of the retry_on_conflict loop!
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Tuple.tuple(location, item);
|
||||||
|
}
|
||||||
|
|
||||||
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
|
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
|
||||||
request.setPrimaryResponse(response);
|
request.setPrimaryResponse(response);
|
||||||
if (response.isFailed()) {
|
if (response.isFailed()) {
|
||||||
|
|
Loading…
Reference in New Issue