fix internal engine unit tests
This commit is contained in:
parent
dac9856863
commit
4396348e9e
|
@ -143,77 +143,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
operation.version(), ((Engine.Index) operation).isCreated());
|
||||
break;
|
||||
case UPDATE:
|
||||
Engine.Operation updateOperation = null;
|
||||
UpdateResponse updateResponse = null;
|
||||
UpdateRequest updateRequest = (UpdateRequest) itemRequest;
|
||||
int maxAttempts = updateRequest.retryOnConflict();
|
||||
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
|
||||
final UpdateHelper.Result translate;
|
||||
try {
|
||||
// translate and execute operation
|
||||
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
|
||||
} catch (Exception failure) {
|
||||
// we may fail translating a update to index or delete operation
|
||||
updateOperation = new Engine.Failure(updateRequest.type(), updateRequest.id(), updateRequest.version(),
|
||||
updateRequest.versionType(), Engine.Operation.Origin.PRIMARY, System.nanoTime(), failure);
|
||||
break; // out of retry loop
|
||||
}
|
||||
updateOperation = shardUpdateOperation(metaData, primary, request, updateRequest, translate);
|
||||
if (updateOperation == null) {
|
||||
// this is a noop operation
|
||||
updateResponse = translate.action();
|
||||
} else {
|
||||
if (updateOperation.hasFailure() == false) {
|
||||
// enrich update response and
|
||||
// set translated update (index/delete) request for replica execution in bulk items
|
||||
switch (updateOperation.operationType()) {
|
||||
case INDEX:
|
||||
IndexRequest updateIndexRequest = translate.action();
|
||||
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
|
||||
updateIndexRequest.type(), updateIndexRequest.id(),
|
||||
updateOperation.version(), ((Engine.Index) updateOperation).isCreated());
|
||||
BytesReference indexSourceAsBytes = updateIndexRequest.source();
|
||||
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
|
||||
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(),
|
||||
indexResponse.getVersion(), indexResponse.getResult());
|
||||
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
|
||||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent =
|
||||
XContentHelper.convertToMap(indexSourceAsBytes, true);
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(),
|
||||
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||
}
|
||||
// replace the update request to the translated index request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
|
||||
break;
|
||||
case DELETE:
|
||||
DeleteRequest updateDeleteRequest = translate.action();
|
||||
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
|
||||
updateDeleteRequest.type(), updateDeleteRequest.id(),
|
||||
updateOperation.version(), ((Engine.Delete) updateOperation).found());
|
||||
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
|
||||
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(),
|
||||
deleteResponse.getVersion(), deleteResponse.getResult());
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
|
||||
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
|
||||
translate.updateSourceContentType(), null));
|
||||
// replace the update request to the translated delete request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
|
||||
break;
|
||||
case FAILURE:
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// version conflict exception, retry
|
||||
if (updateOperation.getFailure() instanceof VersionConflictEngineException) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
break; // out of retry loop
|
||||
}
|
||||
operation = updateOperation;
|
||||
response = updateResponse;
|
||||
UpdateResultHolder updateResultHolder = executeUpdateRequest(((UpdateRequest) itemRequest),
|
||||
primary, metaData, request, requestIndex);
|
||||
operation = updateResultHolder.operation;
|
||||
response = updateResultHolder.response;
|
||||
break;
|
||||
case DELETE:
|
||||
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
|
||||
|
@ -224,21 +157,17 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
break;
|
||||
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
|
||||
}
|
||||
// set item response and handle failures
|
||||
|
||||
// update the bulk item request because update request execution can mutate the bulk item request
|
||||
BulkItemRequest item = request.items()[requestIndex];
|
||||
|
||||
if (operation == null // in case of a noop update operation
|
||||
|| operation.hasFailure() == false) {
|
||||
if (operation == null) {
|
||||
assert response != null;
|
||||
assert response.getResult() == DocWriteResponse.Result.NOOP
|
||||
: "only noop operation can have null operation";
|
||||
} else {
|
||||
if (operation != null) {
|
||||
location = locationToSync(location, operation.getTranslogLocation());
|
||||
} else {
|
||||
assert response.getResult() == DocWriteResponse.Result.NOOP
|
||||
: "only noop update can have null operation";
|
||||
}
|
||||
// add the response
|
||||
// set update response
|
||||
item.setPrimaryResponse(new BulkItemResponse(item.id(), opType, response));
|
||||
} else {
|
||||
DocWriteRequest docWriteRequest = item.request();
|
||||
|
@ -284,62 +213,149 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
return location;
|
||||
}
|
||||
|
||||
/** Executes update request, delegating to a index or delete operation after translation */
|
||||
private Engine.Operation shardUpdateOperation(IndexMetaData metaData, IndexShard primary, BulkShardRequest request,
|
||||
UpdateRequest updateRequest, UpdateHelper.Result translate)
|
||||
throws Exception {
|
||||
switch (translate.getResponseResult()) {
|
||||
case CREATED:
|
||||
case UPDATED:
|
||||
IndexRequest indexRequest = translate.action();
|
||||
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
|
||||
indexRequest.process(mappingMd, allowIdGeneration, request.index());
|
||||
return executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
|
||||
case DELETED:
|
||||
return executeDeleteRequestOnPrimary(translate.action(), primary);
|
||||
case NOOP:
|
||||
primary.noopUpdate(updateRequest.type());
|
||||
return null;
|
||||
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
|
||||
private static class UpdateResultHolder {
|
||||
final Engine.Operation operation;
|
||||
final DocWriteResponse response;
|
||||
|
||||
private UpdateResultHolder(Engine.Operation operation, DocWriteResponse response) {
|
||||
this.operation = operation;
|
||||
this.response = response;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes update request, delegating to a index or delete operation after translation,
|
||||
* handles retries on version conflict and constructs update response
|
||||
* NOTE: reassigns bulk item request at <code>requestIndex</code> for replicas to
|
||||
* execute translated update request (NOOP update is an exception). NOOP updates are
|
||||
* indicated by returning a <code>null</code> operation in {@link UpdateResultHolder}
|
||||
* */
|
||||
private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
|
||||
IndexMetaData metaData, BulkShardRequest request,
|
||||
int requestIndex) throws Exception {
|
||||
Engine.Operation updateOperation = null;
|
||||
UpdateResponse updateResponse = null;
|
||||
int maxAttempts = updateRequest.retryOnConflict();
|
||||
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
|
||||
final UpdateHelper.Result translate;
|
||||
// translate update request
|
||||
try {
|
||||
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
|
||||
} catch (Exception failure) {
|
||||
// we may fail translating a update to index or delete operation
|
||||
updateOperation = new Engine.Failure(updateRequest.type(), updateRequest.id(), updateRequest.version(),
|
||||
updateRequest.versionType(), Engine.Operation.Origin.PRIMARY, System.nanoTime(), failure);
|
||||
break; // out of retry loop
|
||||
}
|
||||
// execute translated update request
|
||||
switch (translate.getResponseResult()) {
|
||||
case CREATED:
|
||||
case UPDATED:
|
||||
IndexRequest indexRequest = translate.action();
|
||||
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
|
||||
indexRequest.process(mappingMd, allowIdGeneration, request.index());
|
||||
updateOperation = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
|
||||
break;
|
||||
case DELETED:
|
||||
updateOperation = executeDeleteRequestOnPrimary(translate.action(), primary);
|
||||
break;
|
||||
case NOOP:
|
||||
primary.noopUpdate(updateRequest.type());
|
||||
break;
|
||||
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
|
||||
}
|
||||
if (updateOperation == null) {
|
||||
// this is a noop operation
|
||||
updateResponse = translate.action();
|
||||
} else {
|
||||
if (updateOperation.hasFailure() == false) {
|
||||
// enrich update response and
|
||||
// set translated update (index/delete) request for replica execution in bulk items
|
||||
switch (updateOperation.operationType()) {
|
||||
case INDEX:
|
||||
IndexRequest updateIndexRequest = translate.action();
|
||||
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
|
||||
updateIndexRequest.type(), updateIndexRequest.id(),
|
||||
updateOperation.version(), ((Engine.Index) updateOperation).isCreated());
|
||||
BytesReference indexSourceAsBytes = updateIndexRequest.source();
|
||||
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
|
||||
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(),
|
||||
indexResponse.getVersion(), indexResponse.getResult());
|
||||
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
|
||||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent =
|
||||
XContentHelper.convertToMap(indexSourceAsBytes, true);
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(),
|
||||
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||
}
|
||||
// replace the update request to the translated index request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
|
||||
break;
|
||||
case DELETE:
|
||||
DeleteRequest updateDeleteRequest = translate.action();
|
||||
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
|
||||
updateDeleteRequest.type(), updateDeleteRequest.id(),
|
||||
updateOperation.version(), ((Engine.Delete) updateOperation).found());
|
||||
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
|
||||
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(),
|
||||
deleteResponse.getVersion(), deleteResponse.getResult());
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
|
||||
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
|
||||
translate.updateSourceContentType(), null));
|
||||
// replace the update request to the translated delete request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
|
||||
break;
|
||||
case FAILURE:
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// version conflict exception, retry
|
||||
if (updateOperation.getFailure() instanceof VersionConflictEngineException) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
break; // out of retry loop
|
||||
}
|
||||
return new UpdateResultHolder(updateOperation, updateResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteReplicaResult onReplicaShard(BulkShardRequest request, IndexShard replica) throws Exception {
|
||||
Translog.Location location = null;
|
||||
for (int i = 0; i < request.items().length; i++) {
|
||||
BulkItemRequest item = request.items()[i];
|
||||
if (item == null || item.isIgnoreOnReplica()) {
|
||||
continue;
|
||||
}
|
||||
DocWriteRequest docWriteRequest = item.request();
|
||||
final Engine.Operation operation;
|
||||
try {
|
||||
switch (docWriteRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
operation = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
|
||||
break;
|
||||
case DELETE:
|
||||
operation = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
|
||||
break;
|
||||
default: throw new IllegalStateException("Unexpected request operation type on replica: "
|
||||
+ docWriteRequest.opType().getLowercase());
|
||||
}
|
||||
if (operation.hasFailure()) {
|
||||
// check if any transient write operation failures should be bubbled up
|
||||
Exception failure = operation.getFailure();
|
||||
if (!ignoreReplicaException(failure)) {
|
||||
throw failure;
|
||||
if (item.isIgnoreOnReplica() == false) {
|
||||
DocWriteRequest docWriteRequest = item.request();
|
||||
final Engine.Operation operation;
|
||||
try {
|
||||
switch (docWriteRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
operation = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
|
||||
break;
|
||||
case DELETE:
|
||||
operation = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected request operation type on replica: "
|
||||
+ docWriteRequest.opType().getLowercase());
|
||||
}
|
||||
if (operation.hasFailure()) {
|
||||
// check if any transient write operation failures should be bubbled up
|
||||
Exception failure = operation.getFailure();
|
||||
if (!ignoreReplicaException(failure)) {
|
||||
throw failure;
|
||||
}
|
||||
} else {
|
||||
location = locationToSync(location, operation.getTranslogLocation());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
if (!ignoreReplicaException(e)) {
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
location = locationToSync(location, operation.getTranslogLocation());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
if (!ignoreReplicaException(e)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,7 +190,13 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
if (update != null) {
|
||||
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
|
||||
// which are bubbled up
|
||||
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
|
||||
try {
|
||||
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// throws IAE on conflicts merging dynamic mappings
|
||||
return new Engine.Failure(request.type(), request.id(), request.version(),
|
||||
request.versionType(), Engine.Operation.Origin.PRIMARY, operation.startTime(), e);
|
||||
}
|
||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||
if (operation.hasFailure() == false) {
|
||||
update = ((Engine.Index) operation).parsedDoc().dynamicMappingsUpdate();
|
||||
|
@ -203,8 +209,11 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
}
|
||||
if (operation.hasFailure() == false) {
|
||||
primary.execute(operation);
|
||||
}
|
||||
if (operation.hasFailure() == false) {
|
||||
// update the version on request so it will happen on the replicas
|
||||
request.version(operation.version());
|
||||
final long version = operation.version();
|
||||
request.version(version);
|
||||
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
}
|
||||
|
|
|
@ -1111,12 +1111,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc, 42, VersionType.FORCE, PRIMARY, 0, -1, false);
|
||||
|
||||
try {
|
||||
engine.index(index);
|
||||
fail("should have failed due to using VersionType.FORCE");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
assertThat(iae.getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0"));
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(IllegalArgumentException.class));
|
||||
assertThat(index.getFailure().getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0"));
|
||||
|
||||
IndexSettings oldIndexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_0_0_beta1)
|
||||
|
@ -1124,12 +1122,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
try (Store store = createStore();
|
||||
Engine engine = createEngine(oldIndexSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
|
||||
index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail("should have failed due to using VersionType.FORCE");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
assertThat(iae.getMessage(), containsString("version type [FORCE] may not be used for non-translog operations"));
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(IllegalArgumentException.class));
|
||||
assertThat(index.getFailure().getMessage(), containsString("version type [FORCE] may not be used for non-translog operations"));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE,
|
||||
Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, 0, -1, false);
|
||||
|
|
Loading…
Reference in New Issue