Only fetch mapping updates when necessary (#33182)
Today we fetch the mapping from the leader and apply it as a mapping update whenever the index metadata version on the leader changes. Yet, the index metadata can change for many reasons other than a mapping update (e.g., settings updates, adding an alias, or a replica being promoted to a primary among many other reasons). This commit builds on the addition of a mapping version to the index metadata to only fetch mapping updates when the mapping version increases. This reduces the number of these fetches and application of mappings on the follower to the bare minimum.
This commit is contained in:
parent
5b11df9c35
commit
cd91992c89
|
@ -161,10 +161,10 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
|
||||
public static final class Response extends ActionResponse {
|
||||
|
||||
private long indexMetadataVersion;
|
||||
private long mappingVersion;
|
||||
|
||||
public long getIndexMetadataVersion() {
|
||||
return indexMetadataVersion;
|
||||
public long getMappingVersion() {
|
||||
return mappingVersion;
|
||||
}
|
||||
|
||||
private long globalCheckpoint;
|
||||
|
@ -188,8 +188,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
Response() {
|
||||
}
|
||||
|
||||
Response(final long indexMetadataVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
|
||||
this.indexMetadataVersion = indexMetadataVersion;
|
||||
Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
|
||||
this.mappingVersion = mappingVersion;
|
||||
this.globalCheckpoint = globalCheckpoint;
|
||||
this.maxSeqNo = maxSeqNo;
|
||||
this.operations = operations;
|
||||
|
@ -198,7 +198,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
@Override
|
||||
public void readFrom(final StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
indexMetadataVersion = in.readVLong();
|
||||
mappingVersion = in.readVLong();
|
||||
globalCheckpoint = in.readZLong();
|
||||
maxSeqNo = in.readZLong();
|
||||
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
|
||||
|
@ -207,7 +207,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(indexMetadataVersion);
|
||||
out.writeVLong(mappingVersion);
|
||||
out.writeZLong(globalCheckpoint);
|
||||
out.writeZLong(maxSeqNo);
|
||||
out.writeArray(Translog.Operation::writeOperation, operations);
|
||||
|
@ -218,7 +218,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
final Response that = (Response) o;
|
||||
return indexMetadataVersion == that.indexMetadataVersion &&
|
||||
return mappingVersion == that.mappingVersion &&
|
||||
globalCheckpoint == that.globalCheckpoint &&
|
||||
maxSeqNo == that.maxSeqNo &&
|
||||
Arrays.equals(operations, that.operations);
|
||||
|
@ -226,7 +226,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(indexMetadataVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
|
||||
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -252,7 +252,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
|
||||
IndexShard indexShard = indexService.getShard(request.getShard().id());
|
||||
final SeqNoStats seqNoStats = indexShard.seqNoStats();
|
||||
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
|
||||
final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
|
||||
|
||||
final Translog.Operation[] operations = getOperations(
|
||||
indexShard,
|
||||
|
@ -260,7 +260,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
request.fromSeqNo,
|
||||
request.maxOperationCount,
|
||||
request.maxOperationSizeInBytes);
|
||||
return new Response(indexMetaDataVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
|
||||
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -80,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
private long followerMaxSeqNo = 0;
|
||||
private int numConcurrentReads = 0;
|
||||
private int numConcurrentWrites = 0;
|
||||
private long currentIndexMetadataVersion = 0;
|
||||
private long currentMappingVersion = 0;
|
||||
private long totalFetchTimeMillis = 0;
|
||||
private long numberOfSuccessfulFetches = 0;
|
||||
private long numberOfFailedFetches = 0;
|
||||
|
@ -131,14 +131,13 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
this.lastRequestedSeqNo = followerGlobalCheckpoint;
|
||||
}
|
||||
|
||||
// Forcefully updates follower mapping, this gets us the leader imd version and
|
||||
// makes sure that leader and follower mapping are identical.
|
||||
updateMapping(imdVersion -> {
|
||||
// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
|
||||
updateMapping(mappingVersion -> {
|
||||
synchronized (ShardFollowNodeTask.this) {
|
||||
currentIndexMetadataVersion = imdVersion;
|
||||
currentMappingVersion = mappingVersion;
|
||||
}
|
||||
LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, indexMetaDataVersion={}",
|
||||
params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, imdVersion);
|
||||
LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, mappingVersion={}",
|
||||
params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, mappingVersion);
|
||||
coordinateReads();
|
||||
});
|
||||
}
|
||||
|
@ -258,7 +257,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
}
|
||||
|
||||
void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
|
||||
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
|
||||
maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
|
||||
}
|
||||
|
||||
/** Called when some operations are fetched from the leading */
|
||||
|
@ -344,16 +343,16 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
coordinateReads();
|
||||
}
|
||||
|
||||
private synchronized void maybeUpdateMapping(Long minimumRequiredIndexMetadataVersion, Runnable task) {
|
||||
if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion) {
|
||||
LOGGER.trace("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
|
||||
params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion);
|
||||
private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) {
|
||||
if (currentMappingVersion >= minimumRequiredMappingVersion) {
|
||||
LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]",
|
||||
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
|
||||
task.run();
|
||||
} else {
|
||||
LOGGER.trace("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]",
|
||||
params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion);
|
||||
updateMapping(imdVersion -> {
|
||||
currentIndexMetadataVersion = imdVersion;
|
||||
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
|
||||
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
|
||||
updateMapping(mappingVersion -> {
|
||||
currentMappingVersion = mappingVersion;
|
||||
task.run();
|
||||
});
|
||||
}
|
||||
|
@ -422,7 +421,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
numConcurrentReads,
|
||||
numConcurrentWrites,
|
||||
buffer.size(),
|
||||
currentIndexMetadataVersion,
|
||||
currentMappingVersion,
|
||||
totalFetchTimeMillis,
|
||||
numberOfSuccessfulFetches,
|
||||
numberOfFailedFetches,
|
||||
|
@ -448,7 +447,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads");
|
||||
static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes");
|
||||
static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes");
|
||||
static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField("index_metadata_version");
|
||||
static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version");
|
||||
static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis");
|
||||
static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches");
|
||||
static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches");
|
||||
|
@ -504,7 +503,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
|
||||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
|
||||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
|
||||
|
@ -582,10 +581,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
return numberOfQueuedWrites;
|
||||
}
|
||||
|
||||
private final long indexMetadataVersion;
|
||||
private final long mappingVersion;
|
||||
|
||||
public long indexMetadataVersion() {
|
||||
return indexMetadataVersion;
|
||||
public long mappingVersion() {
|
||||
return mappingVersion;
|
||||
}
|
||||
|
||||
private final long totalFetchTimeMillis;
|
||||
|
@ -658,7 +657,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
final int numberOfConcurrentReads,
|
||||
final int numberOfConcurrentWrites,
|
||||
final int numberOfQueuedWrites,
|
||||
final long indexMetadataVersion,
|
||||
final long mappingVersion,
|
||||
final long totalFetchTimeMillis,
|
||||
final long numberOfSuccessfulFetches,
|
||||
final long numberOfFailedFetches,
|
||||
|
@ -678,7 +677,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
this.numberOfConcurrentReads = numberOfConcurrentReads;
|
||||
this.numberOfConcurrentWrites = numberOfConcurrentWrites;
|
||||
this.numberOfQueuedWrites = numberOfQueuedWrites;
|
||||
this.indexMetadataVersion = indexMetadataVersion;
|
||||
this.mappingVersion = mappingVersion;
|
||||
this.totalFetchTimeMillis = totalFetchTimeMillis;
|
||||
this.numberOfSuccessfulFetches = numberOfSuccessfulFetches;
|
||||
this.numberOfFailedFetches = numberOfFailedFetches;
|
||||
|
@ -701,7 +700,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
this.numberOfConcurrentReads = in.readVInt();
|
||||
this.numberOfConcurrentWrites = in.readVInt();
|
||||
this.numberOfQueuedWrites = in.readVInt();
|
||||
this.indexMetadataVersion = in.readVLong();
|
||||
this.mappingVersion = in.readVLong();
|
||||
this.totalFetchTimeMillis = in.readVLong();
|
||||
this.numberOfSuccessfulFetches = in.readVLong();
|
||||
this.numberOfFailedFetches = in.readVLong();
|
||||
|
@ -730,7 +729,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
out.writeVInt(numberOfConcurrentReads);
|
||||
out.writeVInt(numberOfConcurrentWrites);
|
||||
out.writeVInt(numberOfQueuedWrites);
|
||||
out.writeVLong(indexMetadataVersion);
|
||||
out.writeVLong(mappingVersion);
|
||||
out.writeVLong(totalFetchTimeMillis);
|
||||
out.writeVLong(numberOfSuccessfulFetches);
|
||||
out.writeVLong(numberOfFailedFetches);
|
||||
|
@ -756,7 +755,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads);
|
||||
builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites);
|
||||
builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites);
|
||||
builder.field(INDEX_METADATA_VERSION_FIELD.getPreferredName(), indexMetadataVersion);
|
||||
builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion);
|
||||
builder.humanReadableField(
|
||||
TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(),
|
||||
"total_fetch_time",
|
||||
|
@ -815,7 +814,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
numberOfConcurrentReads == that.numberOfConcurrentReads &&
|
||||
numberOfConcurrentWrites == that.numberOfConcurrentWrites &&
|
||||
numberOfQueuedWrites == that.numberOfQueuedWrites &&
|
||||
indexMetadataVersion == that.indexMetadataVersion &&
|
||||
mappingVersion == that.mappingVersion &&
|
||||
totalFetchTimeMillis == that.totalFetchTimeMillis &&
|
||||
numberOfSuccessfulFetches == that.numberOfSuccessfulFetches &&
|
||||
numberOfFailedFetches == that.numberOfFailedFetches &&
|
||||
|
@ -837,7 +836,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
numberOfConcurrentReads,
|
||||
numberOfConcurrentWrites,
|
||||
numberOfQueuedWrites,
|
||||
indexMetadataVersion,
|
||||
mappingVersion,
|
||||
totalFetchTimeMillis,
|
||||
numberOfSuccessfulFetches,
|
||||
numberOfFailedFetches,
|
||||
|
|
|
@ -115,7 +115,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
putMappingRequest.type(mappingMetaData.type());
|
||||
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
|
||||
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
|
||||
putMappingResponse -> handler.accept(indexMetaData.getVersion()),
|
||||
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
|
||||
errorHandler));
|
||||
}, errorHandler));
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
|
|||
|
||||
@Override
|
||||
protected ShardChangesAction.Response createTestInstance() {
|
||||
final long indexMetadataVersion = randomNonNegativeLong();
|
||||
final long mappingVersion = randomNonNegativeLong();
|
||||
final long leaderGlobalCheckpoint = randomNonNegativeLong();
|
||||
final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
|
||||
final int numOps = randomInt(8);
|
||||
|
@ -20,7 +20,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
|
|||
for (int i = 0; i < numOps; i++) {
|
||||
operations[i] = new Translog.NoOp(i, 0, "test");
|
||||
}
|
||||
return new ShardChangesAction.Response(indexMetadataVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations);
|
||||
return new ShardChangesAction.Response(mappingVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
assertThat(status.numberOfFailedFetches(), equalTo(numberOfFailedFetches));
|
||||
// the failures were able to be retried so fetch failures should have cleared
|
||||
assertThat(status.fetchExceptions().entrySet(), hasSize(0));
|
||||
assertThat(status.indexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion));
|
||||
assertThat(status.mappingVersion(), equalTo(testRun.finalMappingVersion));
|
||||
});
|
||||
|
||||
task.markAsCompleted();
|
||||
|
@ -88,12 +88,12 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
return new ShardFollowNodeTask(
|
||||
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
|
||||
|
||||
private volatile long indexMetadataVersion = 0L;
|
||||
private volatile long mappingVersion = 0L;
|
||||
private final Map<Long, Integer> fromToSlot = new HashMap<>();
|
||||
|
||||
@Override
|
||||
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
|
||||
handler.accept(indexMetadataVersion);
|
||||
handler.accept(mappingVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,7 +134,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
fromToSlot.put(from, ++slot);
|
||||
// if too many invocations occur with the same from then AOBE occurs, this ok and then something is wrong.
|
||||
}
|
||||
indexMetadataVersion = testResponse.indexMetadataVersion;
|
||||
mappingVersion = testResponse.mappingVersion;
|
||||
if (testResponse.exception != null) {
|
||||
errorHandler.accept(testResponse.exception);
|
||||
} else {
|
||||
|
@ -187,15 +187,15 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
};
|
||||
}
|
||||
|
||||
private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVersion, int maxOperationCount) {
|
||||
private static TestRun createTestRun(long startSeqNo, long startMappingVersion, int maxOperationCount) {
|
||||
long prevGlobalCheckpoint = startSeqNo;
|
||||
long indexMetaDataVersion = startIndexMetadataVersion;
|
||||
long mappingVersion = startMappingVersion;
|
||||
int numResponses = randomIntBetween(16, 256);
|
||||
Map<Long, List<TestResponse>> responses = new HashMap<>(numResponses);
|
||||
for (int i = 0; i < numResponses; i++) {
|
||||
long nextGlobalCheckPoint = prevGlobalCheckpoint + maxOperationCount;
|
||||
if (sometimes()) {
|
||||
indexMetaDataVersion++;
|
||||
mappingVersion++;
|
||||
}
|
||||
|
||||
if (sometimes()) {
|
||||
|
@ -203,7 +203,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
// Sometimes add a random retryable error
|
||||
if (sometimes()) {
|
||||
Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), "");
|
||||
item.add(new TestResponse(error, indexMetaDataVersion, null));
|
||||
item.add(new TestResponse(error, mappingVersion, null));
|
||||
}
|
||||
List<Translog.Operation> ops = new ArrayList<>();
|
||||
for (long seqNo = prevGlobalCheckpoint; seqNo <= nextGlobalCheckPoint; seqNo++) {
|
||||
|
@ -211,8 +211,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
|
||||
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
|
||||
}
|
||||
item.add(new TestResponse(null, indexMetaDataVersion,
|
||||
new ShardChangesAction.Response(indexMetaDataVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY))));
|
||||
item.add(new TestResponse(null, mappingVersion,
|
||||
new ShardChangesAction.Response(mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY))));
|
||||
responses.put(prevGlobalCheckpoint, item);
|
||||
} else {
|
||||
// Simulates a leader shard copy not having all the operations the shard follow task thinks it has by
|
||||
|
@ -224,13 +224,13 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
// Sometimes add a random retryable error
|
||||
if (sometimes()) {
|
||||
Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), "");
|
||||
item.add(new TestResponse(error, indexMetaDataVersion, null));
|
||||
item.add(new TestResponse(error, mappingVersion, null));
|
||||
}
|
||||
// Sometimes add an empty shard changes response to also simulate a leader shard lagging behind
|
||||
if (sometimes()) {
|
||||
ShardChangesAction.Response response =
|
||||
new ShardChangesAction.Response(indexMetaDataVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY);
|
||||
item.add(new TestResponse(null, indexMetaDataVersion, response));
|
||||
new ShardChangesAction.Response(mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY);
|
||||
item.add(new TestResponse(null, mappingVersion, response));
|
||||
}
|
||||
List<Translog.Operation> ops = new ArrayList<>();
|
||||
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
|
||||
|
@ -241,14 +241,14 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
// Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind:
|
||||
long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo;
|
||||
ShardChangesAction.Response response =
|
||||
new ShardChangesAction.Response(indexMetaDataVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY));
|
||||
item.add(new TestResponse(null, indexMetaDataVersion, response));
|
||||
new ShardChangesAction.Response(mappingVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY));
|
||||
item.add(new TestResponse(null, mappingVersion, response));
|
||||
responses.put(fromSeqNo, Collections.unmodifiableList(item));
|
||||
}
|
||||
}
|
||||
prevGlobalCheckpoint = nextGlobalCheckPoint + 1;
|
||||
}
|
||||
return new TestRun(maxOperationCount, startSeqNo, startIndexMetadataVersion, indexMetaDataVersion,
|
||||
return new TestRun(maxOperationCount, startSeqNo, startMappingVersion, mappingVersion,
|
||||
prevGlobalCheckpoint - 1, responses);
|
||||
}
|
||||
|
||||
|
@ -261,18 +261,18 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
|
||||
final int maxOperationCount;
|
||||
final long startSeqNo;
|
||||
final long startIndexMetadataVersion;
|
||||
final long startMappingVersion;
|
||||
|
||||
final long finalIndexMetaDataVerion;
|
||||
final long finalMappingVersion;
|
||||
final long finalExpectedGlobalCheckpoint;
|
||||
final Map<Long, List<TestResponse>> responses;
|
||||
|
||||
private TestRun(int maxOperationCount, long startSeqNo, long startIndexMetadataVersion, long finalIndexMetaDataVerion,
|
||||
private TestRun(int maxOperationCount, long startSeqNo, long startMappingVersion, long finalMappingVersion,
|
||||
long finalExpectedGlobalCheckpoint, Map<Long, List<TestResponse>> responses) {
|
||||
this.maxOperationCount = maxOperationCount;
|
||||
this.startSeqNo = startSeqNo;
|
||||
this.startIndexMetadataVersion = startIndexMetadataVersion;
|
||||
this.finalIndexMetaDataVerion = finalIndexMetaDataVerion;
|
||||
this.startMappingVersion = startMappingVersion;
|
||||
this.finalMappingVersion = finalMappingVersion;
|
||||
this.finalExpectedGlobalCheckpoint = finalExpectedGlobalCheckpoint;
|
||||
this.responses = Collections.unmodifiableMap(responses);
|
||||
}
|
||||
|
@ -281,12 +281,12 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
private static class TestResponse {
|
||||
|
||||
final Exception exception;
|
||||
final long indexMetadataVersion;
|
||||
final long mappingVersion;
|
||||
final ShardChangesAction.Response response;
|
||||
|
||||
private TestResponse(Exception exception, long indexMetadataVersion, ShardChangesAction.Response response) {
|
||||
private TestResponse(Exception exception, long mappingVersion, ShardChangesAction.Response response) {
|
||||
this.exception = exception;
|
||||
this.indexMetadataVersion = indexMetadataVersion;
|
||||
this.mappingVersion = mappingVersion;
|
||||
this.response = response;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
|
|||
assertThat(newInstance.numberOfConcurrentReads(), equalTo(expectedInstance.numberOfConcurrentReads()));
|
||||
assertThat(newInstance.numberOfConcurrentWrites(), equalTo(expectedInstance.numberOfConcurrentWrites()));
|
||||
assertThat(newInstance.numberOfQueuedWrites(), equalTo(expectedInstance.numberOfQueuedWrites()));
|
||||
assertThat(newInstance.indexMetadataVersion(), equalTo(expectedInstance.indexMetadataVersion()));
|
||||
assertThat(newInstance.mappingVersion(), equalTo(expectedInstance.mappingVersion()));
|
||||
assertThat(newInstance.totalFetchTimeMillis(), equalTo(expectedInstance.totalFetchTimeMillis()));
|
||||
assertThat(newInstance.numberOfSuccessfulFetches(), equalTo(expectedInstance.numberOfSuccessfulFetches()));
|
||||
assertThat(newInstance.numberOfFailedFetches(), equalTo(expectedInstance.numberOfFailedFetches()));
|
||||
|
|
|
@ -51,7 +51,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
private Queue<Exception> readFailures;
|
||||
private Queue<Exception> writeFailures;
|
||||
private Queue<Exception> mappingUpdateFailures;
|
||||
private Queue<Long> imdVersions;
|
||||
private Queue<Long> mappingVersions;
|
||||
private Queue<Long> leaderGlobalCheckpoints;
|
||||
private Queue<Long> followerGlobalCheckpoints;
|
||||
private Queue<Long> maxSeqNos;
|
||||
|
@ -180,7 +180,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
for (int i = 0; i < max; i++) {
|
||||
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
|
||||
}
|
||||
imdVersions.add(1L);
|
||||
mappingVersions.add(1L);
|
||||
leaderGlobalCheckpoints.add(63L);
|
||||
maxSeqNos.add(63L);
|
||||
simulateResponse.set(true);
|
||||
|
@ -327,7 +327,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
|
||||
|
||||
ShardFollowNodeTask.Status status = task.getStatus();
|
||||
assertThat(status.indexMetadataVersion(), equalTo(0L));
|
||||
assertThat(status.mappingVersion(), equalTo(0L));
|
||||
assertThat(status.numberOfConcurrentReads(), equalTo(1));
|
||||
assertThat(status.numberOfConcurrentReads(), equalTo(1));
|
||||
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
|
||||
|
@ -433,7 +433,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
|
||||
startTask(task, 63, -1);
|
||||
|
||||
imdVersions.add(1L);
|
||||
mappingVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
@ -442,7 +442,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
|
||||
|
||||
ShardFollowNodeTask.Status status = task.getStatus();
|
||||
assertThat(status.indexMetadataVersion(), equalTo(1L));
|
||||
assertThat(status.mappingVersion(), equalTo(1L));
|
||||
assertThat(status.numberOfConcurrentReads(), equalTo(1));
|
||||
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
|
@ -458,7 +458,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
for (int i = 0; i < max; i++) {
|
||||
mappingUpdateFailures.add(new ConnectException());
|
||||
}
|
||||
imdVersions.add(1L);
|
||||
mappingVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
@ -467,7 +467,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
||||
assertThat(task.isStopped(), equalTo(false));
|
||||
ShardFollowNodeTask.Status status = task.getStatus();
|
||||
assertThat(status.indexMetadataVersion(), equalTo(1L));
|
||||
assertThat(status.mappingVersion(), equalTo(1L));
|
||||
assertThat(status.numberOfConcurrentReads(), equalTo(1));
|
||||
assertThat(status.numberOfConcurrentWrites(), equalTo(1));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
|
@ -483,17 +483,17 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
for (int i = 0; i < max; i++) {
|
||||
mappingUpdateFailures.add(new ConnectException());
|
||||
}
|
||||
imdVersions.add(1L);
|
||||
mappingVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
|
||||
task.handleReadResponse(0L, 64L, response);
|
||||
|
||||
assertThat(mappingUpdateFailures.size(), equalTo(max - 11));
|
||||
assertThat(imdVersions.size(), equalTo(1));
|
||||
assertThat(mappingVersions.size(), equalTo(1));
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
||||
assertThat(task.isStopped(), equalTo(true));
|
||||
ShardFollowNodeTask.Status status = task.getStatus();
|
||||
assertThat(status.indexMetadataVersion(), equalTo(0L));
|
||||
assertThat(status.mappingVersion(), equalTo(0L));
|
||||
assertThat(status.numberOfConcurrentReads(), equalTo(1));
|
||||
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
|
@ -512,7 +512,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
||||
assertThat(task.isStopped(), equalTo(true));
|
||||
ShardFollowNodeTask.Status status = task.getStatus();
|
||||
assertThat(status.indexMetadataVersion(), equalTo(0L));
|
||||
assertThat(status.mappingVersion(), equalTo(0L));
|
||||
assertThat(status.numberOfConcurrentReads(), equalTo(1));
|
||||
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
|
@ -723,7 +723,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
readFailures = new LinkedList<>();
|
||||
writeFailures = new LinkedList<>();
|
||||
mappingUpdateFailures = new LinkedList<>();
|
||||
imdVersions = new LinkedList<>();
|
||||
mappingVersions = new LinkedList<>();
|
||||
leaderGlobalCheckpoints = new LinkedList<>();
|
||||
followerGlobalCheckpoints = new LinkedList<>();
|
||||
maxSeqNos = new LinkedList<>();
|
||||
|
@ -738,9 +738,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
return;
|
||||
}
|
||||
|
||||
Long imdVersion = imdVersions.poll();
|
||||
if (imdVersion != null) {
|
||||
handler.accept(imdVersion);
|
||||
final Long mappingVersion = mappingVersions.poll();
|
||||
if (mappingVersion != null) {
|
||||
handler.accept(mappingVersion);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -779,7 +779,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
}
|
||||
final ShardChangesAction.Response response =
|
||||
new ShardChangesAction.Response(
|
||||
imdVersions.poll(),
|
||||
mappingVersions.poll(),
|
||||
leaderGlobalCheckpoints.poll(),
|
||||
maxSeqNos.poll(),
|
||||
operations);
|
||||
|
@ -805,7 +805,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
};
|
||||
}
|
||||
|
||||
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long imdVersion,
|
||||
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long mappingVersion,
|
||||
long leaderGlobalCheckPoint) {
|
||||
List<Translog.Operation> ops = new ArrayList<>();
|
||||
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
|
||||
|
@ -814,7 +814,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|||
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
|
||||
}
|
||||
return new ShardChangesAction.Response(
|
||||
imdVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0]));
|
||||
mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0]));
|
||||
}
|
||||
|
||||
void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
|
||||
|
|
|
@ -209,7 +209,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
|||
try {
|
||||
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
|
||||
maxOperationCount, params.getMaxBatchSizeInBytes());
|
||||
// Hard code index metadata version, this is ok, as mapping updates are not tested here.
|
||||
// hard code mapping version; this is ok, as mapping updates are not tested here
|
||||
final ShardChangesAction.Response response =
|
||||
new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops);
|
||||
handler.accept(response);
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
- gte: { bar.0.number_of_concurrent_reads: 0 }
|
||||
- match: { bar.0.number_of_concurrent_writes: 0 }
|
||||
- match: { bar.0.number_of_queued_writes: 0 }
|
||||
- gte: { bar.0.index_metadata_version: 0 }
|
||||
- gte: { bar.0.mapping_version: 0 }
|
||||
- gte: { bar.0.total_fetch_time_millis: 0 }
|
||||
- gte: { bar.0.number_of_successful_fetches: 0 }
|
||||
- gte: { bar.0.number_of_failed_fetches: 0 }
|
||||
|
|
Loading…
Reference in New Issue