Bulk API: Properly retry execution on temporal state changes, closes #1343.

This commit is contained in:
Shay Banon 2011-09-17 02:21:20 +03:00
parent 0977b793da
commit 305cf4a567
2 changed files with 46 additions and 30 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
@ -158,6 +159,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version));
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
throw (ElasticSearchException) e;
}
logger.debug("[{}][{}] failed to bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest);
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
@ -174,6 +179,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
responses[i] = new BulkItemResponse(item.id(), "delete",
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound()));
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
throw (ElasticSearchException) e;
}
logger.debug("[{}][{}] failed to bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
responses[i] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));

View File

@ -160,6 +160,42 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
}
protected boolean retryPrimaryException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof IndexShardMissingException ||
cause instanceof IllegalIndexShardStateException ||
cause instanceof IndexMissingException;
}
/**
* Should an exception be ignored when the operation is performed on the replica.
*/
boolean ignoreReplicaException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IllegalIndexShardStateException) {
return true;
}
if (cause instanceof IndexMissingException) {
return true;
}
if (cause instanceof IndexShardMissingException) {
return true;
}
if (cause instanceof ConnectTransportException) {
return true;
}
// on version conflict or document missing, it means
// that a news change has crept into the replica, and its fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
// same here
if (cause instanceof DocumentAlreadyExistsEngineException) {
return true;
}
return false;
}
class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
@Override public Request newInstance() {
@ -429,7 +465,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
performReplicas(response);
} catch (Exception e) {
// shard has not been allocated yet, retry it here
if (e instanceof IndexShardMissingException || e instanceof IllegalIndexShardStateException || e instanceof IndexMissingException) {
if (retryPrimaryException(e)) {
retry(fromDiscoveryListener, shard.shardId());
return;
}
@ -572,35 +608,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
}
/**
* Should an exception be ignored when the operation is performed on the replica.
*/
boolean ignoreReplicaException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IllegalIndexShardStateException) {
return true;
}
if (cause instanceof IndexMissingException) {
return true;
}
if (cause instanceof IndexShardMissingException) {
return true;
}
if (cause instanceof ConnectTransportException) {
return true;
}
// on version conflict or document missing, it means
// that a news change has crept into the replica, and its fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
// same here
if (cause instanceof DocumentAlreadyExistsEngineException) {
return true;
}
return false;
}
}
public static class PrimaryResponse<T> {