Refactoring: Replaced string values with static constants

in TransportShardBulkAction after fixing an issue.
This commit is contained in:
Alexander Reelsen 2014-07-01 08:21:31 +02:00
parent b46d017e5c
commit 4091162d91
1 changed files with 12 additions and 9 deletions

View File

@ -72,6 +72,9 @@ import java.util.Set;
*/
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
private final static String OP_TYPE_UPDATE = "update";
private final static String OP_TYPE_DELETE = "delete";
private final MappingUpdatedAction mappingUpdatedAction;
private final UpdateHelper updateHelper;
private final boolean allowIdGeneration;
@ -208,7 +211,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
try {
// add the response
DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response();
responses[requestIndex] = new BulkItemResponse(item.id(), "delete", deleteResponse);
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse);
} catch (Throwable e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
@ -223,7 +226,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else {
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e));
// nullify the request so it won't execute on the replicas
request.items()[requestIndex] = null;
@ -255,7 +258,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse);
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse);
if (result.mappingToUpdate != null) {
mappingsToUpdate.add(result.mappingToUpdate);
}
@ -273,12 +276,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse);
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse);
// Replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
break;
case NONE:
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResult.noopResult);
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult);
request.items()[requestIndex] = null; // No need to go to the replica
break;
}
@ -290,7 +293,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
// we can't try any more
responses[requestIndex] = new BulkItemResponse(item.id(), "update",
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), t));
request.items()[requestIndex] = null; // do not send to replicas
}
@ -304,7 +307,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
throw (ElasticsearchException) t;
}
if (updateResult.result == null) {
responses[requestIndex] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), t));
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), t));
} else {
switch (updateResult.result.operation()) {
case UPSERT:
@ -315,7 +318,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else {
logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), "update",
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), t));
break;
case DELETE:
@ -325,7 +328,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else {
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
responses[requestIndex] = new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), t));
break;
}