Further refactor and extend testing for `TransportShardBulkAction`
This moves `updateReplicaRequest` to `createPrimaryResponse` and separates the translog updating to be a separate function so that the function purpose is more easily understood (and testable). It also separates the logic for `MappingUpdatePerformer` into two functions, `updateMappingsIfNeeded` and `verifyMappings` so they don't do too much in a single function. This allows finer-grained error testing for when a mapping fails to parse or be applied. Finally, it separates parsing and version validation for `executeIndexRequestOnReplica` into a separate method (`prepareIndexOperationOnReplica`) and adds a test for it. Relates to #23359
This commit is contained in:
parent
c62d4b7b0f
commit
c8081bde91
|
@ -2304,7 +2304,6 @@
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]RetryTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]RetryTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportBulkActionIngestTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportBulkActionIngestTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportBulkActionTookTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportBulkActionTookTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportShardBulkActionTests.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]byscroll[/\\]AsyncBulkByScrollActionTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]byscroll[/\\]AsyncBulkByScrollActionTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]byscroll[/\\]BulkByScrollParallelizationHelperTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]byscroll[/\\]BulkByScrollParallelizationHelperTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]byscroll[/\\]BulkByScrollResponseTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]byscroll[/\\]BulkByScrollResponseTests.java" checks="LineLength" />
|
||||||
|
|
|
@ -20,48 +20,28 @@
|
||||||
package org.elasticsearch.action.bulk;
|
package org.elasticsearch.action.bulk;
|
||||||
|
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.common.Nullable;
|
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public interface MappingUpdatePerformer {
|
public interface MappingUpdatePerformer {
|
||||||
/**
|
|
||||||
* Determine if any mappings need to be updated, and update them on the
|
|
||||||
* master node if necessary. Returnes a failed {@code Engine.IndexResult}
|
|
||||||
* in the event updating the mappings fails or null if successful.
|
|
||||||
* Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the
|
|
||||||
* operation needs to be retried on the primary due to the mappings not
|
|
||||||
* being present yet, or a different exception if updating the mappings
|
|
||||||
* on the master failed.
|
|
||||||
*/
|
|
||||||
@Nullable
|
|
||||||
MappingUpdateResult updateMappingsIfNeeded(IndexShard primary, IndexRequest request) throws Exception;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class encapsulating the resulting of potentially updating the mapping
|
* Determine if any mappings need to be updated, and update them on the master node if
|
||||||
|
* necessary. Returnes a failure Exception in the event updating the mappings fails or null if
|
||||||
|
* successful.
|
||||||
*/
|
*/
|
||||||
class MappingUpdateResult {
|
void updateMappingsIfNeeded(Engine.Index operation,
|
||||||
@Nullable
|
ShardId shardId,
|
||||||
public final Engine.Index operation;
|
String type) throws Exception;
|
||||||
@Nullable
|
|
||||||
public final Exception failure;
|
|
||||||
|
|
||||||
MappingUpdateResult(Exception failure) {
|
/**
|
||||||
Objects.requireNonNull(failure, "failure cannot be null");
|
* Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the operation needs to be
|
||||||
this.failure = failure;
|
* retried on the primary due to the mappings not being present yet, or a different exception if
|
||||||
this.operation = null;
|
* updating the mappings on the master failed.
|
||||||
}
|
*/
|
||||||
|
void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception;
|
||||||
|
|
||||||
MappingUpdateResult(Engine.Index operation) {
|
|
||||||
Objects.requireNonNull(operation, "operation cannot be null");
|
|
||||||
this.operation = operation;
|
|
||||||
this.failure = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isFailed() {
|
|
||||||
return failure != null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,6 +65,9 @@ import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportRequestOptions;
|
import org.elasticsearch.transport.TransportRequestOptions;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.index.translog.Translog.Location;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResultHolder;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -154,10 +157,23 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Translog.Location calculateTranslogLocation(final Translog.Location originalLocation,
|
||||||
|
final BulkItemResultHolder bulkItemResult) {
|
||||||
|
final Engine.Result operationResult = bulkItemResult.operationResult;
|
||||||
|
if (operationResult != null && operationResult.hasFailure() == false) {
|
||||||
|
return locationToSync(originalLocation, operationResult.getTranslogLocation());
|
||||||
|
} else {
|
||||||
|
return originalLocation;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Visible for unit testing
|
// Visible for unit testing
|
||||||
static Translog.Location updateReplicaRequest(BulkItemResultHolder bulkItemResult,
|
/**
|
||||||
|
* Creates a BulkItemResponse for the primary operation and returns it. If no bulk response is
|
||||||
|
* needed (because one already exists and the operation failed), then return null.
|
||||||
|
*/
|
||||||
|
static BulkItemResponse createPrimaryResponse(BulkItemResultHolder bulkItemResult,
|
||||||
final DocWriteRequest.OpType opType,
|
final DocWriteRequest.OpType opType,
|
||||||
final Translog.Location originalLocation,
|
|
||||||
BulkShardRequest request) {
|
BulkShardRequest request) {
|
||||||
final Engine.Result operationResult = bulkItemResult.operationResult;
|
final Engine.Result operationResult = bulkItemResult.operationResult;
|
||||||
final DocWriteResponse response = bulkItemResult.response;
|
final DocWriteResponse response = bulkItemResult.response;
|
||||||
|
@ -165,16 +181,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
|
|
||||||
if (operationResult == null) { // in case of noop update operation
|
if (operationResult == null) { // in case of noop update operation
|
||||||
assert response.getResult() == DocWriteResponse.Result.NOOP : "only noop updates can have a null operation";
|
assert response.getResult() == DocWriteResponse.Result.NOOP : "only noop updates can have a null operation";
|
||||||
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response));
|
return new BulkItemResponse(replicaRequest.id(), opType, response);
|
||||||
return originalLocation;
|
|
||||||
|
|
||||||
} else if (operationResult.hasFailure() == false) {
|
} else if (operationResult.hasFailure() == false) {
|
||||||
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response);
|
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response);
|
||||||
replicaRequest.setPrimaryResponse(primaryResponse);
|
|
||||||
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
|
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
|
||||||
primaryResponse.getResponse().setShardInfo(new ShardInfo());
|
primaryResponse.getResponse().setShardInfo(new ShardInfo());
|
||||||
// The operation was successful, advance the translog
|
return primaryResponse;
|
||||||
return locationToSync(originalLocation, operationResult.getTranslogLocation());
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
DocWriteRequest docWriteRequest = replicaRequest.request();
|
DocWriteRequest docWriteRequest = replicaRequest.request();
|
||||||
|
@ -187,19 +200,19 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
|
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
|
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
|
||||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
||||||
// then just use the response we got from the failed execution
|
// then just use the response we got from the failed execution
|
||||||
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
|
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
|
||||||
replicaRequest.setPrimaryResponse(
|
return new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
|
||||||
new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
|
// Make sure to use request.index() here, if you
|
||||||
// Make sure to use request.indox() here, if you
|
|
||||||
// use docWriteRequest.index() it will use the
|
// use docWriteRequest.index() it will use the
|
||||||
// concrete index instead of an alias if used!
|
// concrete index instead of an alias if used!
|
||||||
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
|
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure));
|
||||||
|
} else {
|
||||||
|
assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response";
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
return originalLocation;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,11 +246,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
// update the bulk item request because update request execution can mutate the bulk item request
|
// update the bulk item request because update request execution can mutate the bulk item request
|
||||||
request.items()[requestIndex] = replicaRequest;
|
request.items()[requestIndex] = replicaRequest;
|
||||||
|
|
||||||
// Modify the replica request, if needed, and return a new translog location
|
// Retrieve the primary response, and update the replica request with the primary's response
|
||||||
location = updateReplicaRequest(responseHolder, opType, location, request);
|
BulkItemResponse primaryResponse = createPrimaryResponse(responseHolder, opType, request);
|
||||||
|
if (primaryResponse != null) {
|
||||||
|
replicaRequest.setPrimaryResponse(primaryResponse);
|
||||||
|
}
|
||||||
|
|
||||||
assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response";
|
// Update the translog with the new location, if needed
|
||||||
return location;
|
return calculateTranslogLocation(location, responseHolder);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isConflictException(final Exception e) {
|
private static boolean isConflictException(final Exception e) {
|
||||||
|
@ -396,14 +412,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
return new WriteReplicaResult<>(request, location, null, replica, logger);
|
return new WriteReplicaResult<>(request, location, null, replica, logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
|
private static Translog.Location locationToSync(Translog.Location current,
|
||||||
/* here we are moving forward in the translog with each operation. Under the hood
|
Translog.Location next) {
|
||||||
* this might cross translog files which is ok since from the user perspective
|
/* here we are moving forward in the translog with each operation. Under the hood this might
|
||||||
* the translog is like a tape where only the highest location needs to be fsynced
|
* cross translog files which is ok since from the user perspective the translog is like a
|
||||||
* in order to sync all previous locations even though they are not in the same file.
|
* tape where only the highest location needs to be fsynced in order to sync all previous
|
||||||
* When the translog rolls over files the previous file is fsynced on after closing if needed.*/
|
* locations even though they are not in the same file. When the translog rolls over files
|
||||||
|
* the previous file is fsynced on after closing if needed.*/
|
||||||
assert next != null : "next operation can't be null";
|
assert next != null : "next operation can't be null";
|
||||||
assert current == null || current.compareTo(next) < 0 : "translog locations are not increasing";
|
assert current == null || current.compareTo(next) < 0 :
|
||||||
|
"translog locations are not increasing";
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -411,45 +429,82 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
* Execute the given {@link IndexRequest} on a replica shard, throwing a
|
* Execute the given {@link IndexRequest} on a replica shard, throwing a
|
||||||
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
|
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
|
||||||
*/
|
*/
|
||||||
public static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request, IndexShard replica) throws IOException {
|
public static Engine.IndexResult executeIndexRequestOnReplica(
|
||||||
final ShardId shardId = replica.shardId();
|
DocWriteResponse primaryResponse,
|
||||||
SourceToParse sourceToParse =
|
IndexRequest request,
|
||||||
SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source(),
|
IndexShard replica) throws IOException {
|
||||||
request.getContentType()).routing(request.routing()).parent(request.parent());
|
|
||||||
|
|
||||||
final Engine.Index operation;
|
final Engine.Index operation;
|
||||||
final long version = primaryResponse.getVersion();
|
|
||||||
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
|
|
||||||
assert versionType.validateVersionForWrites(version);
|
|
||||||
final long seqNo = primaryResponse.getSeqNo();
|
|
||||||
try {
|
try {
|
||||||
operation = replica.prepareIndexOnReplica(sourceToParse, seqNo, version, versionType, request.getAutoGeneratedTimestamp(), request.isRetry());
|
operation = prepareIndexOperationOnReplica(primaryResponse, request, replica);
|
||||||
} catch (MapperParsingException e) {
|
} catch (MapperParsingException e) {
|
||||||
return new Engine.IndexResult(e, version, seqNo);
|
return new Engine.IndexResult(e, primaryResponse.getVersion(),
|
||||||
|
primaryResponse.getSeqNo());
|
||||||
}
|
}
|
||||||
|
|
||||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||||
if (update != null) {
|
if (update != null) {
|
||||||
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
|
final ShardId shardId = replica.shardId();
|
||||||
|
throw new RetryOnReplicaException(shardId,
|
||||||
|
"Mappings are not available on the replica yet, triggered update: " + update);
|
||||||
}
|
}
|
||||||
return replica.index(operation);
|
return replica.index(operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Utility method to prepare an index operation on replica shards */
|
||||||
|
static Engine.Index prepareIndexOperationOnReplica(
|
||||||
|
DocWriteResponse primaryResponse,
|
||||||
|
IndexRequest request,
|
||||||
|
IndexShard replica) {
|
||||||
|
|
||||||
|
final ShardId shardId = replica.shardId();
|
||||||
|
final long version = primaryResponse.getVersion();
|
||||||
|
final long seqNo = primaryResponse.getSeqNo();
|
||||||
|
final SourceToParse sourceToParse =
|
||||||
|
SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(),
|
||||||
|
request.type(), request.id(), request.source(), request.getContentType())
|
||||||
|
.routing(request.routing()).parent(request.parent());
|
||||||
|
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
|
||||||
|
assert versionType.validateVersionForWrites(version);
|
||||||
|
|
||||||
|
return replica.prepareIndexOnReplica(sourceToParse, seqNo, version, versionType,
|
||||||
|
request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||||
|
}
|
||||||
|
|
||||||
/** Utility method to prepare an index operation on primary shards */
|
/** Utility method to prepare an index operation on primary shards */
|
||||||
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
|
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
|
||||||
SourceToParse sourceToParse =
|
final SourceToParse sourceToParse =
|
||||||
SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source(),
|
SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(),
|
||||||
request.getContentType()).routing(request.routing()).parent(request.parent());
|
request.id(), request.source(), request.getContentType())
|
||||||
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
.routing(request.routing()).parent(request.parent());
|
||||||
|
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(),
|
||||||
|
request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
|
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
|
||||||
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
|
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
|
||||||
MappingUpdatePerformer mappingUpdater) throws Exception {
|
MappingUpdatePerformer mappingUpdater) throws Exception {
|
||||||
MappingUpdatePerformer.MappingUpdateResult result = mappingUpdater.updateMappingsIfNeeded(primary, request);
|
// Update the mappings if parsing the documents includes new dynamic updates
|
||||||
if (result.isFailed()) {
|
try {
|
||||||
return new Engine.IndexResult(result.failure, request.version());
|
final Engine.Index preUpdateOperation = prepareIndexOperationOnPrimary(request, primary);
|
||||||
|
mappingUpdater.updateMappingsIfNeeded(preUpdateOperation, primary.shardId(), request.type());
|
||||||
|
} catch (MapperParsingException | IllegalArgumentException failure) {
|
||||||
|
return new Engine.IndexResult(failure, request.version());
|
||||||
}
|
}
|
||||||
return primary.index(result.operation);
|
|
||||||
|
// Verify that there are no more mappings that need to be applied. If there are failures, a
|
||||||
|
// ReplicationOperation.RetryOnPrimaryException is thrown.
|
||||||
|
final Engine.Index operation;
|
||||||
|
try {
|
||||||
|
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||||
|
mappingUpdater.verifyMappings(operation, primary.shardId());
|
||||||
|
} catch (MapperParsingException | IllegalStateException e) {
|
||||||
|
// there was an error in parsing the document that was not because
|
||||||
|
// of pending mapping updates, so return a failure for the result
|
||||||
|
return new Engine.IndexResult(e, request.version());
|
||||||
|
}
|
||||||
|
|
||||||
|
return primary.index(operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
|
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
|
||||||
|
@ -468,36 +523,22 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
|
|
||||||
class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer {
|
class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer {
|
||||||
|
|
||||||
@Nullable
|
public void updateMappingsIfNeeded(final Engine.Index operation, final ShardId shardId,
|
||||||
public MappingUpdateResult updateMappingsIfNeeded(IndexShard primary, IndexRequest request) throws Exception {
|
final String type) throws Exception {
|
||||||
Engine.Index operation;
|
|
||||||
try {
|
|
||||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
|
||||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
|
||||||
return new MappingUpdateResult(e);
|
|
||||||
}
|
|
||||||
final Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
final Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||||
final ShardId shardId = primary.shardId();
|
|
||||||
if (update != null) {
|
if (update != null) {
|
||||||
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
|
// can throw timeout exception when updating mappings or ISE for attempting to
|
||||||
// which are bubbled up
|
// update default mappings which are bubbled up
|
||||||
try {
|
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update);
|
||||||
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
// throws IAE on conflicts merging dynamic mappings
|
|
||||||
return new MappingUpdateResult(e);
|
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
|
||||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
|
||||||
return new MappingUpdateResult(e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void verifyMappings(final Engine.Index operation,
|
||||||
|
final ShardId shardId) throws Exception {
|
||||||
if (operation.parsedDoc().dynamicMappingsUpdate() != null) {
|
if (operation.parsedDoc().dynamicMappingsUpdate() != null) {
|
||||||
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
|
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
|
||||||
"Dynamic mappings are not available on the node that holds the primary yet");
|
"Dynamic mappings are not available on the node that holds the primary yet");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new MappingUpdateResult(operation);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,8 @@ import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
||||||
|
import org.elasticsearch.action.bulk.MappingUpdatePerformer;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResultHolder;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
@ -77,26 +79,30 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
|
|
||||||
public void testShouldExecuteReplicaItem() throws Exception {
|
public void testShouldExecuteReplicaItem() throws Exception {
|
||||||
// Successful index request should be replicated
|
// Successful index request should be replicated
|
||||||
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
||||||
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
|
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
|
||||||
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
|
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
|
||||||
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
|
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
|
||||||
assertTrue(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
|
assertTrue(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
|
||||||
|
|
||||||
// Failed index requests should not be replicated (for now!)
|
// Failed index requests should not be replicated (for now!)
|
||||||
writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
||||||
response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
|
response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
|
||||||
request = new BulkItemRequest(0, writeRequest);
|
request = new BulkItemRequest(0, writeRequest);
|
||||||
request.setPrimaryResponse(
|
request.setPrimaryResponse(
|
||||||
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
|
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
|
||||||
new BulkItemResponse.Failure("test", "type", "id", new IllegalArgumentException("i died"))));
|
new BulkItemResponse.Failure("test", "type", "id",
|
||||||
|
new IllegalArgumentException("i died"))));
|
||||||
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
|
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
|
||||||
|
|
||||||
// NOOP requests should not be replicated
|
// NOOP requests should not be replicated
|
||||||
writeRequest = new UpdateRequest("index", "type", "id");
|
writeRequest = new UpdateRequest("index", "type", "id");
|
||||||
response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP);
|
response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP);
|
||||||
request = new BulkItemRequest(0, writeRequest);
|
request = new BulkItemRequest(0, writeRequest);
|
||||||
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response));
|
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE,
|
||||||
|
response));
|
||||||
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
|
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,13 +118,15 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
.create(create);
|
.create(create);
|
||||||
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
|
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
|
||||||
items[0] = primaryRequest;
|
items[0] = primaryRequest;
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
|
|
||||||
Translog.Location location = new Translog.Location(0, 0, 0);
|
Translog.Location location = new Translog.Location(0, 0, 0);
|
||||||
UpdateHelper updateHelper = null;
|
UpdateHelper updateHelper = null;
|
||||||
|
|
||||||
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
|
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData,
|
||||||
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
|
shard, bulkShardRequest, location, 0, updateHelper,
|
||||||
|
threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
|
||||||
|
|
||||||
// Translog should change, since there were no problems
|
// Translog should change, since there were no problems
|
||||||
assertThat(newLocation, not(location));
|
assertThat(newLocation, not(location));
|
||||||
|
@ -127,7 +135,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
|
|
||||||
assertThat(primaryResponse.getItemId(), equalTo(0));
|
assertThat(primaryResponse.getItemId(), equalTo(0));
|
||||||
assertThat(primaryResponse.getId(), equalTo("id"));
|
assertThat(primaryResponse.getId(), equalTo("id"));
|
||||||
assertThat(primaryResponse.getOpType(), equalTo(create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX));
|
assertThat(primaryResponse.getOpType(),
|
||||||
|
equalTo(create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX));
|
||||||
assertFalse(primaryResponse.isFailed());
|
assertFalse(primaryResponse.isFailed());
|
||||||
|
|
||||||
// Assert that the document actually made it there
|
// Assert that the document actually made it there
|
||||||
|
@ -140,8 +149,10 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
items[0] = primaryRequest;
|
items[0] = primaryRequest;
|
||||||
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
|
|
||||||
Translog.Location secondLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
|
Translog.Location secondLocation =
|
||||||
newLocation, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
|
TransportShardBulkAction.executeBulkItemRequest( metaData,
|
||||||
|
shard, bulkShardRequest, newLocation, 0, updateHelper,
|
||||||
|
threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
|
||||||
|
|
||||||
// Translog should not change, since the document was not indexed due to a version conflict
|
// Translog should not change, since the document was not indexed due to a version conflict
|
||||||
assertThat(secondLocation, equalTo(newLocation));
|
assertThat(secondLocation, equalTo(newLocation));
|
||||||
|
@ -177,9 +188,11 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
IndexShard shard = newStartedShard(true);
|
IndexShard shard = newStartedShard(true);
|
||||||
|
|
||||||
BulkItemRequest[] items = new BulkItemRequest[1];
|
BulkItemRequest[] items = new BulkItemRequest[1];
|
||||||
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
||||||
items[0] = new BulkItemRequest(0, writeRequest);
|
items[0] = new BulkItemRequest(0, writeRequest);
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
|
|
||||||
Translog.Location location = new Translog.Location(0, 0, 0);
|
Translog.Location location = new Translog.Location(0, 0, 0);
|
||||||
UpdateHelper updateHelper = null;
|
UpdateHelper updateHelper = null;
|
||||||
|
@ -188,8 +201,9 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
Exception err = new ReplicationOperation.RetryOnPrimaryException(shardId, "rejection");
|
Exception err = new ReplicationOperation.RetryOnPrimaryException(shardId, "rejection");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest, location,
|
TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
|
||||||
0, updateHelper, threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(err));
|
location, 0, updateHelper, threadPool::absoluteTimeInMillis,
|
||||||
|
new ThrowingMappingUpdatePerformer(err));
|
||||||
fail("should have thrown a retry exception");
|
fail("should have thrown a retry exception");
|
||||||
} catch (ReplicationOperation.RetryOnPrimaryException e) {
|
} catch (ReplicationOperation.RetryOnPrimaryException e) {
|
||||||
assertThat(e, equalTo(err));
|
assertThat(e, equalTo(err));
|
||||||
|
@ -203,9 +217,11 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
IndexShard shard = newStartedShard(true);
|
IndexShard shard = newStartedShard(true);
|
||||||
|
|
||||||
BulkItemRequest[] items = new BulkItemRequest[1];
|
BulkItemRequest[] items = new BulkItemRequest[1];
|
||||||
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
||||||
items[0] = new BulkItemRequest(0, writeRequest);
|
items[0] = new BulkItemRequest(0, writeRequest);
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
|
|
||||||
Translog.Location location = new Translog.Location(0, 0, 0);
|
Translog.Location location = new Translog.Location(0, 0, 0);
|
||||||
UpdateHelper updateHelper = null;
|
UpdateHelper updateHelper = null;
|
||||||
|
@ -213,8 +229,9 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
// Return a mapping conflict (IAE) when trying to update the mapping
|
// Return a mapping conflict (IAE) when trying to update the mapping
|
||||||
Exception err = new IllegalArgumentException("mapping conflict");
|
Exception err = new IllegalArgumentException("mapping conflict");
|
||||||
|
|
||||||
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
|
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData,
|
||||||
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new FailingMappingUpdatePerformer(err));
|
shard, bulkShardRequest, location, 0, updateHelper,
|
||||||
|
threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(err));
|
||||||
|
|
||||||
// Translog shouldn't change, as there were conflicting mappings
|
// Translog shouldn't change, as there were conflicting mappings
|
||||||
assertThat(newLocation, equalTo(location));
|
assertThat(newLocation, equalTo(location));
|
||||||
|
@ -245,13 +262,15 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
BulkItemRequest[] items = new BulkItemRequest[1];
|
BulkItemRequest[] items = new BulkItemRequest[1];
|
||||||
DocWriteRequest writeRequest = new DeleteRequest("index", "type", "id");
|
DocWriteRequest writeRequest = new DeleteRequest("index", "type", "id");
|
||||||
items[0] = new BulkItemRequest(0, writeRequest);
|
items[0] = new BulkItemRequest(0, writeRequest);
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
|
|
||||||
Translog.Location location = new Translog.Location(0, 0, 0);
|
Translog.Location location = new Translog.Location(0, 0, 0);
|
||||||
UpdateHelper updateHelper = null;
|
UpdateHelper updateHelper = null;
|
||||||
|
|
||||||
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
|
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData,
|
||||||
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
|
shard, bulkShardRequest, location, 0, updateHelper,
|
||||||
|
threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
|
||||||
|
|
||||||
// Translog changes, even though the document didn't exist
|
// Translog changes, even though the document didn't exist
|
||||||
assertThat(newLocation, not(location));
|
assertThat(newLocation, not(location));
|
||||||
|
@ -288,8 +307,9 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
|
|
||||||
location = newLocation;
|
location = newLocation;
|
||||||
|
|
||||||
newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
|
newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard,
|
||||||
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
|
bulkShardRequest, location, 0, updateHelper, threadPool::absoluteTimeInMillis,
|
||||||
|
new NoopMappingUpdatePerformer());
|
||||||
|
|
||||||
// Translog changes, because the document was deleted
|
// Translog changes, because the document was deleted
|
||||||
assertThat(newLocation, not(location));
|
assertThat(newLocation, not(location));
|
||||||
|
@ -322,19 +342,25 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNoopUpdateReplicaRequest() throws Exception {
|
public void testNoopUpdateReplicaRequest() throws Exception {
|
||||||
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
||||||
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
||||||
|
|
||||||
DocWriteResponse noopUpdateResponse = new UpdateResponse(shardId, "index", "id", 0, DocWriteResponse.Result.NOOP);
|
DocWriteResponse noopUpdateResponse = new UpdateResponse(shardId, "index", "id", 0,
|
||||||
BulkItemResultHolder noopResults = new BulkItemResultHolder(noopUpdateResponse, null, replicaRequest);
|
DocWriteResponse.Result.NOOP);
|
||||||
|
BulkItemResultHolder noopResults = new BulkItemResultHolder(noopUpdateResponse, null,
|
||||||
|
replicaRequest);
|
||||||
|
|
||||||
Translog.Location location = new Translog.Location(0, 0, 0);
|
Translog.Location location = new Translog.Location(0, 0, 0);
|
||||||
BulkItemRequest[] items = new BulkItemRequest[0];
|
BulkItemRequest[] items = new BulkItemRequest[0];
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(noopResults,
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
|
|
||||||
|
|
||||||
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
|
BulkItemResponse primaryResponse = TransportShardBulkAction.createPrimaryResponse(
|
||||||
|
noopResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest);
|
||||||
|
|
||||||
|
Translog.Location newLocation =
|
||||||
|
TransportShardBulkAction.calculateTranslogLocation(location, noopResults);
|
||||||
|
|
||||||
// Basically nothing changes in the request since it's a noop
|
// Basically nothing changes in the request since it's a noop
|
||||||
assertThat(newLocation, equalTo(location));
|
assertThat(newLocation, equalTo(location));
|
||||||
|
@ -342,24 +368,30 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
assertThat(primaryResponse.getId(), equalTo("id"));
|
assertThat(primaryResponse.getId(), equalTo("id"));
|
||||||
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
|
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
|
||||||
assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse));
|
assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse));
|
||||||
assertThat(primaryResponse.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP));
|
assertThat(primaryResponse.getResponse().getResult(),
|
||||||
|
equalTo(DocWriteResponse.Result.NOOP));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdateReplicaRequestWithFailure() throws Exception {
|
public void testUpdateReplicaRequestWithFailure() throws Exception {
|
||||||
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
||||||
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
||||||
|
|
||||||
Exception err = new ElasticsearchException("I'm dead <(x.x)>");
|
Exception err = new ElasticsearchException("I'm dead <(x.x)>");
|
||||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
|
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
|
||||||
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, replicaRequest);
|
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult,
|
||||||
|
replicaRequest);
|
||||||
|
|
||||||
Translog.Location location = new Translog.Location(0, 0, 0);
|
Translog.Location location = new Translog.Location(0, 0, 0);
|
||||||
BulkItemRequest[] items = new BulkItemRequest[0];
|
BulkItemRequest[] items = new BulkItemRequest[0];
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(failedResults,
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
|
BulkItemResponse primaryResponse =
|
||||||
|
TransportShardBulkAction.createPrimaryResponse(
|
||||||
|
failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest);
|
||||||
|
|
||||||
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
|
Translog.Location newLocation =
|
||||||
|
TransportShardBulkAction.calculateTranslogLocation(location, failedResults);
|
||||||
|
|
||||||
// Since this was not a conflict failure, the primary response
|
// Since this was not a conflict failure, the primary response
|
||||||
// should be filled out with the failure information
|
// should be filled out with the failure information
|
||||||
|
@ -378,20 +410,26 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdateReplicaRequestWithConflictFailure() throws Exception {
|
public void testUpdateReplicaRequestWithConflictFailure() throws Exception {
|
||||||
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
||||||
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
||||||
|
|
||||||
Exception err = new VersionConflictEngineException(shardId, "type", "id", "I'm conflicted <(;_;)>");
|
Exception err = new VersionConflictEngineException(shardId, "type", "id",
|
||||||
|
"I'm conflicted <(;_;)>");
|
||||||
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
|
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
|
||||||
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, replicaRequest);
|
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult,
|
||||||
|
replicaRequest);
|
||||||
|
|
||||||
Translog.Location location = new Translog.Location(0, 0, 0);
|
Translog.Location location = new Translog.Location(0, 0, 0);
|
||||||
BulkItemRequest[] items = new BulkItemRequest[0];
|
BulkItemRequest[] items = new BulkItemRequest[0];
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(failedResults,
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
|
BulkItemResponse primaryResponse =
|
||||||
|
TransportShardBulkAction.createPrimaryResponse(
|
||||||
|
failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest);
|
||||||
|
|
||||||
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
|
Translog.Location newLocation =
|
||||||
|
TransportShardBulkAction.calculateTranslogLocation(location, failedResults);
|
||||||
|
|
||||||
// Since this was not a conflict failure, the primary response
|
// Since this was not a conflict failure, the primary response
|
||||||
// should be filled out with the failure information
|
// should be filled out with the failure information
|
||||||
|
@ -410,22 +448,27 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdateReplicaRequestWithSuccess() throws Exception {
|
public void testUpdateReplicaRequestWithSuccess() throws Exception {
|
||||||
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
||||||
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
||||||
|
|
||||||
boolean created = randomBoolean();
|
boolean created = randomBoolean();
|
||||||
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
|
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
|
||||||
Engine.IndexResult indexResult = new FakeResult(1, 1, created, resultLocation);
|
Engine.IndexResult indexResult = new FakeResult(1, 1, created, resultLocation);
|
||||||
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
|
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
|
||||||
BulkItemResultHolder goodResults = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
|
BulkItemResultHolder goodResults =
|
||||||
|
new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
|
||||||
|
|
||||||
Translog.Location originalLocation = new Translog.Location(21, 21, 21);
|
Translog.Location originalLocation = new Translog.Location(21, 21, 21);
|
||||||
BulkItemRequest[] items = new BulkItemRequest[0];
|
BulkItemRequest[] items = new BulkItemRequest[0];
|
||||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
BulkShardRequest bulkShardRequest =
|
||||||
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(goodResults,
|
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||||
DocWriteRequest.OpType.INDEX, originalLocation, bulkShardRequest);
|
BulkItemResponse primaryResponse =
|
||||||
|
TransportShardBulkAction.createPrimaryResponse(
|
||||||
|
goodResults, DocWriteRequest.OpType.INDEX, bulkShardRequest);
|
||||||
|
|
||||||
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
|
Translog.Location newLocation =
|
||||||
|
TransportShardBulkAction.calculateTranslogLocation(originalLocation, goodResults);
|
||||||
|
|
||||||
// Check that the translog is successfully advanced
|
// Check that the translog is successfully advanced
|
||||||
assertThat(newLocation, equalTo(resultLocation));
|
assertThat(newLocation, equalTo(resultLocation));
|
||||||
|
@ -438,6 +481,61 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK));
|
assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCalculateTranslogLocation() throws Exception {
|
||||||
|
final Translog.Location original = new Translog.Location(0, 0, 0);
|
||||||
|
|
||||||
|
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
||||||
|
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
|
||||||
|
BulkItemResultHolder results = new BulkItemResultHolder(null, null, replicaRequest);
|
||||||
|
|
||||||
|
assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results),
|
||||||
|
equalTo(original));
|
||||||
|
|
||||||
|
boolean created = randomBoolean();
|
||||||
|
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
|
||||||
|
Translog.Location newLocation = new Translog.Location(1, 1, 1);
|
||||||
|
Engine.IndexResult indexResult = new IndexResultWithLocation(randomNonNegativeLong(),
|
||||||
|
randomNonNegativeLong(), created, newLocation);
|
||||||
|
results = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
|
||||||
|
assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results),
|
||||||
|
equalTo(newLocation));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class IndexResultWithLocation extends Engine.IndexResult {
|
||||||
|
private final Translog.Location location;
|
||||||
|
public IndexResultWithLocation(long version, long seqNo, boolean created,
|
||||||
|
Translog.Location newLocation) {
|
||||||
|
super(version, seqNo, created);
|
||||||
|
this.location = newLocation;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Translog.Location getTranslogLocation() {
|
||||||
|
return this.location;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPrepareIndexOpOnReplica() throws Exception {
|
||||||
|
IndexMetaData metaData = indexMetaData();
|
||||||
|
IndexShard shard = newStartedShard(false);
|
||||||
|
|
||||||
|
DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id",
|
||||||
|
1, 1, randomBoolean());
|
||||||
|
IndexRequest request = new IndexRequest("index", "type", "id")
|
||||||
|
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
|
||||||
|
|
||||||
|
Engine.Index op = TransportShardBulkAction.prepareIndexOperationOnReplica(
|
||||||
|
primaryResponse, request, shard);
|
||||||
|
|
||||||
|
assertThat(op.version(), equalTo(primaryResponse.getVersion()));
|
||||||
|
assertThat(op.seqNo(), equalTo(primaryResponse.getSeqNo()));
|
||||||
|
assertThat(op.versionType(), equalTo(VersionType.EXTERNAL));
|
||||||
|
|
||||||
|
closeShards(shard);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fake IndexResult that has a settable translog location
|
* Fake IndexResult that has a settable translog location
|
||||||
*/
|
*/
|
||||||
|
@ -445,7 +543,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
|
|
||||||
private final Translog.Location location;
|
private final Translog.Location location;
|
||||||
|
|
||||||
protected FakeResult(long version, long seqNo, boolean created, Translog.Location location) {
|
protected FakeResult(long version, long seqNo, boolean created,
|
||||||
|
Translog.Location location) {
|
||||||
super(version, seqNo, created);
|
super(version, seqNo, created);
|
||||||
this.location = location;
|
this.location = location;
|
||||||
}
|
}
|
||||||
|
@ -458,23 +557,12 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
|
|
||||||
/** Doesn't perform any mapping updates */
|
/** Doesn't perform any mapping updates */
|
||||||
public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer {
|
public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer {
|
||||||
public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
|
public void updateMappingsIfNeeded(Engine.Index operation,
|
||||||
IndexRequest request) throws Exception {
|
ShardId shardId,
|
||||||
Engine.Index operation = TransportShardBulkAction.prepareIndexOperationOnPrimary(request, primary);
|
String type) throws Exception {
|
||||||
return new MappingUpdatePerformer.MappingUpdateResult(operation);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Always returns the given failure */
|
public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {
|
||||||
private class FailingMappingUpdatePerformer implements MappingUpdatePerformer {
|
|
||||||
private final Exception e;
|
|
||||||
FailingMappingUpdatePerformer(Exception e) {
|
|
||||||
this.e = e;
|
|
||||||
}
|
|
||||||
|
|
||||||
public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
|
|
||||||
IndexRequest request) throws Exception {
|
|
||||||
return new MappingUpdatePerformer.MappingUpdateResult(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -485,8 +573,30 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
this.e = e;
|
this.e = e;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
|
public void updateMappingsIfNeeded(Engine.Index operation,
|
||||||
IndexRequest request) throws Exception {
|
ShardId shardId,
|
||||||
|
String type) throws Exception {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {
|
||||||
|
fail("should not have gotten to this point");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Always throw the given exception */
|
||||||
|
private class ThrowingVerifyingMappingUpdatePerformer implements MappingUpdatePerformer {
|
||||||
|
private final Exception e;
|
||||||
|
ThrowingVerifyingMappingUpdatePerformer(Exception e) {
|
||||||
|
this.e = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateMappingsIfNeeded(Engine.Index operation,
|
||||||
|
ShardId shardId,
|
||||||
|
String type) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue