mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
Replicate aliases in cross-cluster replication (#42875)
This commit adds functionality so that aliases that are manipulated on leader indices are replicated by the shard follow tasks to the follower indices. Note that we ignore write indices. This is due to the fact that follower indices do not receive direct writes so the concept is not useful. Relates #41815
This commit is contained in:
parent
aad1b3a2a0
commit
117df87b2b
@ -101,6 +101,7 @@ public final class IndicesFollowStats {
|
||||
static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes");
|
||||
static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version");
|
||||
static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version");
|
||||
static final ParseField FOLLOWER_ALIASES_VERSION_FIELD = new ParseField("follower_aliases_version");
|
||||
static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis");
|
||||
static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis");
|
||||
static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests");
|
||||
@ -117,41 +118,42 @@ public final class IndicesFollowStats {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static final ConstructingObjectParser<ShardFollowStats, Void> PARSER =
|
||||
new ConstructingObjectParser<>(
|
||||
"shard-follow-stats",
|
||||
true,
|
||||
args -> new ShardFollowStats(
|
||||
(String) args[0],
|
||||
(String) args[1],
|
||||
(String) args[2],
|
||||
(int) args[3],
|
||||
(long) args[4],
|
||||
(long) args[5],
|
||||
(long) args[6],
|
||||
(long) args[7],
|
||||
(long) args[8],
|
||||
(int) args[9],
|
||||
(int) args[10],
|
||||
(int) args[11],
|
||||
(long) args[12],
|
||||
(long) args[13],
|
||||
(long) args[14],
|
||||
(long) args[15],
|
||||
(long) args[16],
|
||||
(long) args[17],
|
||||
(long) args[18],
|
||||
(long) args[19],
|
||||
(long) args[20],
|
||||
(long) args[21],
|
||||
(long) args[22],
|
||||
(long) args[23],
|
||||
(long) args[24],
|
||||
(long) args[25],
|
||||
new TreeMap<>(
|
||||
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[26])
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
|
||||
(ElasticsearchException) args[27]));
|
||||
new ConstructingObjectParser<>(
|
||||
"shard-follow-stats",
|
||||
true,
|
||||
args -> new ShardFollowStats(
|
||||
(String) args[0],
|
||||
(String) args[1],
|
||||
(String) args[2],
|
||||
(int) args[3],
|
||||
(long) args[4],
|
||||
(long) args[5],
|
||||
(long) args[6],
|
||||
(long) args[7],
|
||||
(long) args[8],
|
||||
(int) args[9],
|
||||
(int) args[10],
|
||||
(int) args[11],
|
||||
(long) args[12],
|
||||
(long) args[13],
|
||||
(long) args[14],
|
||||
(long) args[15],
|
||||
(long) args[16],
|
||||
(long) args[17],
|
||||
(long) args[18],
|
||||
(long) args[19],
|
||||
(long) args[20],
|
||||
(long) args[21],
|
||||
(long) args[22],
|
||||
(long) args[23],
|
||||
(long) args[24],
|
||||
(long) args[25],
|
||||
(long) args[26],
|
||||
new TreeMap<>(
|
||||
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[27])
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
|
||||
(ElasticsearchException) args[28]));
|
||||
|
||||
static final ConstructingObjectParser<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>, Void> READ_EXCEPTIONS_ENTRY_PARSER =
|
||||
new ConstructingObjectParser<>(
|
||||
@ -175,6 +177,7 @@ public final class IndicesFollowStats {
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_ALIASES_VERSION_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD);
|
||||
@ -220,6 +223,7 @@ public final class IndicesFollowStats {
|
||||
private final long writeBufferSizeInBytes;
|
||||
private final long followerMappingVersion;
|
||||
private final long followerSettingsVersion;
|
||||
private final long followerAliasesVersion;
|
||||
private final long totalReadTimeMillis;
|
||||
private final long totalReadRemoteExecTimeMillis;
|
||||
private final long successfulReadRequests;
|
||||
@ -249,6 +253,7 @@ public final class IndicesFollowStats {
|
||||
long writeBufferSizeInBytes,
|
||||
long followerMappingVersion,
|
||||
long followerSettingsVersion,
|
||||
long followerAliasesVersion,
|
||||
long totalReadTimeMillis,
|
||||
long totalReadRemoteExecTimeMillis,
|
||||
long successfulReadRequests,
|
||||
@ -277,6 +282,7 @@ public final class IndicesFollowStats {
|
||||
this.writeBufferSizeInBytes = writeBufferSizeInBytes;
|
||||
this.followerMappingVersion = followerMappingVersion;
|
||||
this.followerSettingsVersion = followerSettingsVersion;
|
||||
this.followerAliasesVersion = followerAliasesVersion;
|
||||
this.totalReadTimeMillis = totalReadTimeMillis;
|
||||
this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis;
|
||||
this.successfulReadRequests = successfulReadRequests;
|
||||
@ -352,6 +358,10 @@ public final class IndicesFollowStats {
|
||||
return followerSettingsVersion;
|
||||
}
|
||||
|
||||
public long getFollowerAliasesVersion() {
|
||||
return followerAliasesVersion;
|
||||
}
|
||||
|
||||
public long getTotalReadTimeMillis() {
|
||||
return totalReadTimeMillis;
|
||||
}
|
||||
|
@ -106,6 +106,7 @@ public class CcrStatsResponseTests extends AbstractResponseTestCase<CcrStatsActi
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
Collections.emptyNavigableMap(),
|
||||
randomLong(),
|
||||
randomBoolean() ? new ElasticsearchException("fatal error") : null);
|
||||
@ -190,6 +191,8 @@ public class CcrStatsResponseTests extends AbstractResponseTestCase<CcrStatsActi
|
||||
equalTo(expectedShardFollowStats.followerMappingVersion()));
|
||||
assertThat(actualShardFollowStats.getFollowerSettingsVersion(),
|
||||
equalTo(expectedShardFollowStats.followerSettingsVersion()));
|
||||
assertThat(actualShardFollowStats.getFollowerAliasesVersion(),
|
||||
equalTo(expectedShardFollowStats.followerAliasesVersion()));
|
||||
assertThat(actualShardFollowStats.getTotalReadTimeMillis(),
|
||||
equalTo(expectedShardFollowStats.totalReadTimeMillis()));
|
||||
assertThat(actualShardFollowStats.getSuccessfulReadRequests(),
|
||||
|
@ -93,6 +93,8 @@ public class FollowStatsResponseTests extends AbstractResponseTestCase<FollowSta
|
||||
equalTo(expectedShardFollowStats.followerMappingVersion()));
|
||||
assertThat(actualShardFollowStats.getFollowerSettingsVersion(),
|
||||
equalTo(expectedShardFollowStats.followerSettingsVersion()));
|
||||
assertThat(actualShardFollowStats.getFollowerAliasesVersion(),
|
||||
equalTo(expectedShardFollowStats.followerAliasesVersion()));
|
||||
assertThat(actualShardFollowStats.getTotalReadTimeMillis(),
|
||||
equalTo(expectedShardFollowStats.totalReadTimeMillis()));
|
||||
assertThat(actualShardFollowStats.getSuccessfulReadRequests(),
|
||||
|
@ -114,6 +114,9 @@ The `shards` array consists of objects containing the following fields:
|
||||
`indices[].shards[].follower_settings_version`::
|
||||
(long) the index settings version the follower is synced up to
|
||||
|
||||
`indices[].shards[].follower_aliases_version`::
|
||||
(long) the index aliases version the follower is synced up to
|
||||
|
||||
`indices[].shards[].total_read_time_millis`::
|
||||
(long) the total time reads were outstanding, measured from the time a read
|
||||
was sent to the leader to the time a reply was returned to the follower
|
||||
@ -217,6 +220,7 @@ The API returns the following results:
|
||||
"write_buffer_operation_count" : 64,
|
||||
"follower_mapping_version" : 4,
|
||||
"follower_settings_version" : 2,
|
||||
"follower_aliases_version" : 8,
|
||||
"total_read_time_millis" : 32768,
|
||||
"total_read_remote_exec_time_millis" : 16384,
|
||||
"successful_read_requests" : 32,
|
||||
@ -246,6 +250,7 @@ The API returns the following results:
|
||||
// TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/]
|
||||
// TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/]
|
||||
// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/]
|
||||
// TESTRESPONSE[s/"follower_aliases_version" : 8/"follower_aliases_version" : $body.indices.0.shards.0.follower_aliases_version/]
|
||||
// TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/]
|
||||
// TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/]
|
||||
// TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/]
|
||||
|
@ -126,6 +126,7 @@ The API returns the following results:
|
||||
"write_buffer_operation_count" : 64,
|
||||
"follower_mapping_version" : 4,
|
||||
"follower_settings_version" : 2,
|
||||
"follower_aliases_version" : 8,
|
||||
"total_read_time_millis" : 32768,
|
||||
"total_read_remote_exec_time_millis" : 16384,
|
||||
"successful_read_requests" : 32,
|
||||
@ -161,6 +162,7 @@ The API returns the following results:
|
||||
// TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.follow_stats.indices.0.shards.0.write_buffer_operation_count/]
|
||||
// TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.follow_stats.indices.0.shards.0.follower_mapping_version/]
|
||||
// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.follow_stats.indices.0.shards.0.follower_settings_version/]
|
||||
// TESTRESPONSE[s/"follower_aliases_version" : 8/"follower_aliases_version" : $body.follow_stats.indices.0.shards.0.follower_aliases_version/]
|
||||
// TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.follow_stats.indices.0.shards.0.total_read_time_millis/]
|
||||
// TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.follow_stats.indices.0.shards.0.total_read_remote_exec_time_millis/]
|
||||
// TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.follow_stats.indices.0.shards.0.successful_read_requests/]
|
||||
|
@ -141,15 +141,13 @@ public class AliasMetaData extends AbstractDiffable<AliasMetaData> implements To
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
AliasMetaData that = (AliasMetaData) o;
|
||||
final AliasMetaData that = (AliasMetaData) o;
|
||||
|
||||
if (alias != null ? !alias.equals(that.alias) : that.alias != null) return false;
|
||||
if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false;
|
||||
if (indexRouting != null ? !indexRouting.equals(that.indexRouting) : that.indexRouting != null) return false;
|
||||
if (searchRouting != null ? !searchRouting.equals(that.searchRouting) : that.searchRouting != null)
|
||||
return false;
|
||||
if (writeIndex != null ? writeIndex != that.writeIndex : that.writeIndex != null)
|
||||
return false;
|
||||
if (searchRouting != null ? !searchRouting.equals(that.searchRouting) : that.searchRouting != null) return false;
|
||||
if (writeIndex != null ? writeIndex != that.writeIndex : that.writeIndex != null) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -143,6 +143,7 @@ public class ESCCRRestTestCase extends ESRestTestCase {
|
||||
int followerMaxSeqNo = 0;
|
||||
int followerMappingVersion = 0;
|
||||
int followerSettingsVersion = 0;
|
||||
int followerAliasesVersion = 0;
|
||||
|
||||
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
|
||||
assertThat(hits.size(), greaterThanOrEqualTo(1));
|
||||
@ -164,11 +165,15 @@ public class ESCCRRestTestCase extends ESRestTestCase {
|
||||
int foundFollowerSettingsVersion =
|
||||
(int) XContentMapValues.extractValue("_source.ccr_stats.follower_settings_version", hit);
|
||||
followerSettingsVersion = Math.max(followerSettingsVersion, foundFollowerSettingsVersion);
|
||||
int foundFollowerAliasesVersion =
|
||||
(int) XContentMapValues.extractValue("_source.ccr_stats.follower_aliases_version", hit);
|
||||
followerAliasesVersion = Math.max(followerAliasesVersion, foundFollowerAliasesVersion);
|
||||
}
|
||||
|
||||
assertThat(followerMaxSeqNo, greaterThan(0));
|
||||
assertThat(followerMappingVersion, greaterThan(0));
|
||||
assertThat(followerSettingsVersion, greaterThan(0));
|
||||
assertThat(followerAliasesVersion, greaterThan(0));
|
||||
}
|
||||
|
||||
protected static void verifyAutoFollowMonitoring() throws IOException {
|
||||
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
@ -219,6 +220,12 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
return settingsVersion;
|
||||
}
|
||||
|
||||
private long aliasesVersion;
|
||||
|
||||
public long getAliasesVersion() {
|
||||
return aliasesVersion;
|
||||
}
|
||||
|
||||
private long globalCheckpoint;
|
||||
|
||||
public long getGlobalCheckpoint() {
|
||||
@ -256,6 +263,11 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
super(in);
|
||||
mappingVersion = in.readVLong();
|
||||
settingsVersion = in.readVLong();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
aliasesVersion = in.readVLong();
|
||||
} else {
|
||||
aliasesVersion = 0;
|
||||
}
|
||||
globalCheckpoint = in.readZLong();
|
||||
maxSeqNo = in.readZLong();
|
||||
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
|
||||
@ -264,16 +276,17 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
}
|
||||
|
||||
Response(
|
||||
final long mappingVersion,
|
||||
final long settingsVersion,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNo,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final Translog.Operation[] operations,
|
||||
final long tookInMillis) {
|
||||
|
||||
final long mappingVersion,
|
||||
final long settingsVersion,
|
||||
final long aliasesVersion,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNo,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final Translog.Operation[] operations,
|
||||
final long tookInMillis) {
|
||||
this.mappingVersion = mappingVersion;
|
||||
this.settingsVersion = settingsVersion;
|
||||
this.aliasesVersion = aliasesVersion;
|
||||
this.globalCheckpoint = globalCheckpoint;
|
||||
this.maxSeqNo = maxSeqNo;
|
||||
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
|
||||
@ -291,6 +304,9 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(mappingVersion);
|
||||
out.writeVLong(settingsVersion);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
out.writeVLong(aliasesVersion);
|
||||
}
|
||||
out.writeZLong(globalCheckpoint);
|
||||
out.writeZLong(maxSeqNo);
|
||||
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
|
||||
@ -305,6 +321,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
final Response that = (Response) o;
|
||||
return mappingVersion == that.mappingVersion &&
|
||||
settingsVersion == that.settingsVersion &&
|
||||
aliasesVersion == that.aliasesVersion &&
|
||||
globalCheckpoint == that.globalCheckpoint &&
|
||||
maxSeqNo == that.maxSeqNo &&
|
||||
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
|
||||
@ -317,6 +334,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
return Objects.hash(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
globalCheckpoint,
|
||||
maxSeqNo,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
@ -361,9 +379,11 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
final IndexMetaData indexMetaData = indexService.getMetaData();
|
||||
final long mappingVersion = indexMetaData.getMappingVersion();
|
||||
final long settingsVersion = indexMetaData.getSettingsVersion();
|
||||
final long aliasesVersion = indexMetaData.getAliasesVersion();
|
||||
return getResponse(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
seqNoStats,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
operations,
|
||||
@ -436,12 +456,14 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
|
||||
final long mappingVersion = indexMetaData.getMappingVersion();
|
||||
final long settingsVersion = indexMetaData.getSettingsVersion();
|
||||
final long aliasesVersion = indexMetaData.getAliasesVersion();
|
||||
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
|
||||
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
|
||||
listener.onResponse(
|
||||
getResponse(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
latestSeqNoStats,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
EMPTY_OPERATIONS_ARRAY,
|
||||
@ -541,6 +563,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
static Response getResponse(
|
||||
final long mappingVersion,
|
||||
final long settingsVersion,
|
||||
final long aliasesVersion,
|
||||
final SeqNoStats seqNoStats,
|
||||
final long maxSeqNoOfUpdates,
|
||||
final Translog.Operation[] operations,
|
||||
@ -550,6 +573,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||
return new Response(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
seqNoStats.getGlobalCheckpoint(),
|
||||
seqNoStats.getMaxSeqNo(),
|
||||
maxSeqNoOfUpdates,
|
||||
|
@ -78,6 +78,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
private int numOutstandingWrites = 0;
|
||||
private long currentMappingVersion = 0;
|
||||
private long currentSettingsVersion = 0;
|
||||
private long currentAliasesVersion = 0;
|
||||
private long totalReadRemoteExecTimeMillis = 0;
|
||||
private long totalReadTimeMillis = 0;
|
||||
private long successfulReadRequests = 0;
|
||||
@ -154,15 +155,27 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
synchronized (ShardFollowNodeTask.this) {
|
||||
currentSettingsVersion = leaderSettingsVersion;
|
||||
}
|
||||
});
|
||||
updateAliases(leaderAliasesVersion -> {
|
||||
synchronized (ShardFollowNodeTask.this) {
|
||||
currentAliasesVersion = leaderAliasesVersion;
|
||||
}
|
||||
});
|
||||
synchronized (ShardFollowNodeTask.this) {
|
||||
LOGGER.info(
|
||||
"{} following leader shard {}, follower global checkpoint=[{}], mapping version=[{}], settings version=[{}]",
|
||||
"{} following leader shard {}, " +
|
||||
"follower global checkpoint=[{}], " +
|
||||
"mapping version=[{}], " +
|
||||
"settings version=[{}], " +
|
||||
"aliases version=[{}]",
|
||||
params.getFollowShardId(),
|
||||
params.getLeaderShardId(),
|
||||
followerGlobalCheckpoint,
|
||||
leaderMappingVersion,
|
||||
leaderSettingsVersion);
|
||||
coordinateReads();
|
||||
});
|
||||
currentMappingVersion,
|
||||
currentSettingsVersion,
|
||||
currentAliasesVersion);
|
||||
}
|
||||
coordinateReads();
|
||||
});
|
||||
}
|
||||
|
||||
@ -306,12 +319,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
// In order to process this read response (3), we need to check and potentially update the follow index's setting (1) and
|
||||
// check and potentially update the follow index's mappings (2).
|
||||
|
||||
// 3) handle read response:
|
||||
// 4) handle read response:
|
||||
Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response);
|
||||
// 2) update follow index mapping:
|
||||
// 3) update follow index mapping:
|
||||
Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(), handleResponseTask);
|
||||
// 1) update follow index settings:
|
||||
maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
|
||||
// 2) update follow index settings:
|
||||
Runnable updateSettingsTask = () -> maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
|
||||
// 1) update follow index aliases:
|
||||
maybeUpdateAliases(response.getAliasesVersion(), updateSettingsTask);
|
||||
}
|
||||
|
||||
void handleFallenBehindLeaderShard(Exception e, long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) {
|
||||
@ -423,7 +438,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
|
||||
private synchronized void maybeUpdateSettings(final Long minimumRequiredSettingsVersion, Runnable task) {
|
||||
if (currentSettingsVersion >= minimumRequiredSettingsVersion) {
|
||||
LOGGER.trace("{} settings version [{}] is higher or equal than minimum required mapping version [{}]",
|
||||
LOGGER.trace("{} settings version [{}] is higher or equal than minimum required settings version [{}]",
|
||||
params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
|
||||
task.run();
|
||||
} else {
|
||||
@ -436,6 +451,27 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void maybeUpdateAliases(final Long minimumRequiredAliasesVersion, final Runnable task) {
|
||||
if (currentAliasesVersion >= minimumRequiredAliasesVersion) {
|
||||
LOGGER.trace(
|
||||
"{} aliases version [{}] is higher or equal than minimum required aliases version [{}]",
|
||||
params.getFollowShardId(),
|
||||
currentAliasesVersion,
|
||||
minimumRequiredAliasesVersion);
|
||||
task.run();
|
||||
} else {
|
||||
LOGGER.trace(
|
||||
"{} updating aliases, aliases version [{}] is lower than minimum required aliases version [{}]",
|
||||
params.getFollowShardId(),
|
||||
currentAliasesVersion,
|
||||
minimumRequiredAliasesVersion);
|
||||
updateAliases(aliasesVersion -> {
|
||||
currentAliasesVersion = aliasesVersion;
|
||||
task.run();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) {
|
||||
updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0));
|
||||
}
|
||||
@ -453,6 +489,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
innerUpdateSettings(handler, e -> handleFailure(e, retryCounter, () -> updateSettings(handler, retryCounter)));
|
||||
}
|
||||
|
||||
private void updateAliases(final LongConsumer handler) {
|
||||
updateAliases(handler, new AtomicInteger());
|
||||
}
|
||||
|
||||
private void updateAliases(final LongConsumer handler, final AtomicInteger retryCounter) {
|
||||
innerUpdateAliases(handler, e -> handleFailure(e, retryCounter, () -> updateAliases(handler, retryCounter)));
|
||||
}
|
||||
|
||||
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
|
||||
assert e != null;
|
||||
if (shouldRetry(params.getRemoteCluster(), e)) {
|
||||
@ -511,6 +555,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
|
||||
protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);
|
||||
|
||||
protected abstract void innerUpdateAliases(LongConsumer handler, Consumer<Exception> errorHandler);
|
||||
|
||||
protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
|
||||
List<Translog.Operation> operations,
|
||||
long leaderMaxSeqNoOfUpdatesOrDeletes,
|
||||
@ -566,6 +612,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||
bufferSizeInBytes,
|
||||
currentMappingVersion,
|
||||
currentSettingsVersion,
|
||||
currentAliasesVersion,
|
||||
totalReadTimeMillis,
|
||||
totalReadRemoteExecTimeMillis,
|
||||
successfulReadRequests,
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
@ -14,6 +15,7 @@ import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
@ -26,6 +28,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
@ -62,7 +65,9 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -200,6 +205,124 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exception> errorHandler) {
|
||||
/*
|
||||
* The strategy for updating the aliases is fairly simple. We look at the aliases that exist on the leader, and those that
|
||||
* exist on the follower. We partition these aliases into three sets: the aliases that exist on both the leader and the
|
||||
* follower, the aliases that are on the leader only, and the aliases that are on the follower only.
|
||||
*
|
||||
* For the aliases that are on the leader and the follower, we compare the aliases and add an action to overwrite the
|
||||
* follower view of the alias if the aliases are different. If the aliases are the same, we skip the alias. Note that the
|
||||
* meaning of equals here intentionally ignores the write index. There are two reasons for this. First, follower indices
|
||||
* do not receive direct writes so conceptually the write index is not useful. Additionally, there is a larger challenge.
|
||||
* Suppose that we did copy over the write index from the leader to the follower. On the leader, when the write index is
|
||||
* swapped from one index to another, this is done atomically. However, to do this on the follower, we would have to step
|
||||
* outside the shard follow tasks framework and have a separate framework for copying aliases over. This is because if we
|
||||
* try to manage the aliases including the write aliases with the shard follow tasks, we do not have a way to move the write
|
||||
* index atomically (since we have a single-index view here only) and therefore we can end up in situations where we would
|
||||
* try to assign the write index to two indices. Further, trying to do this outside the shard follow tasks framework has
|
||||
* problems too, since it could be that the new aliases arrive on the coordinator before the write index has even been
|
||||
* created on the local cluster. So there are race conditions either way. All of this put together means that we will simply
|
||||
* ignore the write index.
|
||||
*
|
||||
* For aliases that are on the leader but not the follower, we copy those aliases over to the follower.
|
||||
*
|
||||
* For aliases that are on the follower but not the leader, we remove those aliases from the follower.
|
||||
*/
|
||||
final Index leaderIndex = params.getLeaderShardId().getIndex();
|
||||
final Index followerIndex = params.getFollowShardId().getIndex();
|
||||
|
||||
final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
|
||||
|
||||
final CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
|
||||
final IndexMetaData leaderIndexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
|
||||
final IndexMetaData followerIndexMetaData = clusterService.state().metaData().getIndexSafe(followerIndex);
|
||||
|
||||
// partition the aliases into the three sets
|
||||
final HashSet<String> aliasesOnLeaderNotOnFollower = new HashSet<>();
|
||||
final HashSet<String> aliasesInCommon = new HashSet<>();
|
||||
final HashSet<String> aliasesOnFollowerNotOnLeader = new HashSet<>();
|
||||
|
||||
for (final ObjectCursor<String> aliasName : leaderIndexMetaData.getAliases().keys()) {
|
||||
if (followerIndexMetaData.getAliases().containsKey(aliasName.value)) {
|
||||
aliasesInCommon.add(aliasName.value);
|
||||
} else {
|
||||
aliasesOnLeaderNotOnFollower.add(aliasName.value);
|
||||
}
|
||||
}
|
||||
|
||||
for (final ObjectCursor<String> aliasName : followerIndexMetaData.getAliases().keys()) {
|
||||
if (leaderIndexMetaData.getAliases().containsKey(aliasName.value)) {
|
||||
assert aliasesInCommon.contains(aliasName.value) : aliasName.value;
|
||||
} else {
|
||||
aliasesOnFollowerNotOnLeader.add(aliasName.value);
|
||||
}
|
||||
}
|
||||
|
||||
final List<IndicesAliasesRequest.AliasActions> aliasActions = new ArrayList<>();
|
||||
|
||||
// add the aliases the follower does not have
|
||||
for (final String aliasName : aliasesOnLeaderNotOnFollower) {
|
||||
final AliasMetaData alias = leaderIndexMetaData.getAliases().get(aliasName);
|
||||
// we intentionally override that the alias is not a write alias as follower indices do not receive direct writes
|
||||
aliasActions.add(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(followerIndex.getName())
|
||||
.alias(alias.alias())
|
||||
.filter(alias.filter() == null ? null : alias.filter().toString())
|
||||
.indexRouting(alias.indexRouting())
|
||||
.searchRouting(alias.searchRouting())
|
||||
.writeIndex(false));
|
||||
}
|
||||
|
||||
// update the aliases that are different (ignoring write aliases)
|
||||
for (final String aliasName : aliasesInCommon) {
|
||||
final AliasMetaData leaderAliasMetaData = leaderIndexMetaData.getAliases().get(aliasName);
|
||||
// we intentionally override that the alias is not a write alias as follower indices do not receive direct writes
|
||||
final AliasMetaData leaderAliasMetaDataWithoutWriteIndex = new AliasMetaData.Builder(aliasName)
|
||||
.filter(leaderAliasMetaData.filter())
|
||||
.indexRouting(leaderAliasMetaData.indexRouting())
|
||||
.searchRouting(leaderAliasMetaData.searchRouting())
|
||||
.writeIndex(false)
|
||||
.build();
|
||||
final AliasMetaData followerAliasMetaData = followerIndexMetaData.getAliases().get(aliasName);
|
||||
if (leaderAliasMetaDataWithoutWriteIndex.equals(followerAliasMetaData)) {
|
||||
// skip this alias, the leader and follower have the same modulo the write index
|
||||
continue;
|
||||
}
|
||||
// we intentionally override that the alias is not a write alias as follower indices do not receive direct writes
|
||||
aliasActions.add(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(followerIndex.getName())
|
||||
.alias(leaderAliasMetaData.alias())
|
||||
.filter(leaderAliasMetaData.filter() == null ? null : leaderAliasMetaData.filter().toString())
|
||||
.indexRouting(leaderAliasMetaData.indexRouting())
|
||||
.searchRouting(leaderAliasMetaData.searchRouting())
|
||||
.writeIndex(false));
|
||||
}
|
||||
|
||||
// remove aliases that the leader no longer has
|
||||
for (final String aliasName : aliasesOnFollowerNotOnLeader) {
|
||||
aliasActions.add(IndicesAliasesRequest.AliasActions.remove().index(followerIndex.getName()).alias(aliasName));
|
||||
}
|
||||
|
||||
final IndicesAliasesRequest request = new IndicesAliasesRequest();
|
||||
if (aliasActions.isEmpty()) {
|
||||
handler.accept(leaderIndexMetaData.getAliasesVersion());
|
||||
} else {
|
||||
aliasActions.forEach(request::addAliasAction);
|
||||
followerClient.admin().indices().aliases(
|
||||
request,
|
||||
ActionListener.wrap(r -> handler.accept(leaderIndexMetaData.getAliasesVersion()), errorHandler));
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
|
||||
} catch (final NoSuchRemoteClusterException e) {
|
||||
errorHandler.accept(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void closeIndexUpdateSettingsAndOpenIndex(String followIndex,
|
||||
Settings updatedSettings,
|
||||
Runnable handler,
|
||||
|
@ -0,0 +1,413 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.common.CheckedBiConsumer;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
public class CcrAliasesIT extends CcrIntegTestCase {
|
||||
|
||||
public void testAliasOnIndexCreation() throws Exception {
|
||||
final String aliasName = randomAlphaOfLength(16);
|
||||
final String aliases;
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("aliases");
|
||||
{
|
||||
builder.startObject(aliasName);
|
||||
{
|
||||
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
aliases = BytesReference.bytes(builder).utf8ToString();
|
||||
}
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(aliases, XContentType.JSON));
|
||||
final PutFollowAction.Request followRequest = putFollow("leader", "follower");
|
||||
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
ensureFollowerGreen(true, "follower");
|
||||
|
||||
// wait for the shard follow task to exist
|
||||
assertBusy(() -> assertShardFollowTask(1));
|
||||
|
||||
assertAliasesExist("leader", "follower", aliasName);
|
||||
}
|
||||
|
||||
public void testAddAlias() throws Exception {
|
||||
runAddAliasTest(null);
|
||||
}
|
||||
|
||||
public void testAddExplicitNotWriteAlias() throws Exception {
|
||||
runAddAliasTest(false);
|
||||
}
|
||||
|
||||
public void testWriteAliasIsIgnored() throws Exception {
|
||||
runAddAliasTest(true);
|
||||
}
|
||||
|
||||
private void runAddAliasTest(final Boolean isWriteAlias) throws Exception {
|
||||
runAddAliasTest(isWriteAlias, aliasName -> {});
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs an add alias test which adds a random alias to the leader exist, and then asserts that the alias is replicated to the follower.
|
||||
* The specified post assertions gives the caller the opportunity to add additional assertions on the alias that is added. These
|
||||
* assertions are executed after all other assertions that the alias exists.
|
||||
*
|
||||
* @param isWriteIndex whether or not the leader index is the write index for the alias
|
||||
* @param postAssertions the post assertions to execute
|
||||
* @param <E> the type of checked exception the post assertions callback can throw
|
||||
* @throws Exception if a checked exception is thrown while executing the add alias test
|
||||
*/
|
||||
private <E extends Exception> void runAddAliasTest(
|
||||
final Boolean isWriteIndex,
|
||||
final CheckedConsumer<String, E> postAssertions) throws Exception {
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("leader"));
|
||||
final PutFollowAction.Request followRequest = putFollow("leader", "follower");
|
||||
// we set a low poll timeout so that shard changes requests are responded to quickly even without indexing
|
||||
followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100));
|
||||
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
ensureFollowerGreen(true, "follower");
|
||||
|
||||
assertBusy(() -> assertShardFollowTask(1));
|
||||
|
||||
final String aliasName = randomAlphaOfLength(16);
|
||||
addRandomAlias("leader", aliasName, isWriteIndex);
|
||||
|
||||
assertAliasesExist("leader", "follower", aliasName);
|
||||
|
||||
postAssertions.accept(aliasName);
|
||||
}
|
||||
|
||||
private void addRandomAlias(final String index, final String aliasName, final Boolean isWriteIndex) {
|
||||
final IndicesAliasesRequest.AliasActions add = IndicesAliasesRequest.AliasActions.add();
|
||||
add.index(index);
|
||||
add.alias(aliasName);
|
||||
add.writeIndex(isWriteIndex);
|
||||
if (randomBoolean()) {
|
||||
add.routing(randomAlphaOfLength(16));
|
||||
} else {
|
||||
if (randomBoolean()) {
|
||||
add.indexRouting(randomAlphaOfLength(16));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
add.searchRouting(randomAlphaOfLength(16));
|
||||
}
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
add.filter(termQuery(randomAlphaOfLength(16), randomAlphaOfLength(16)));
|
||||
}
|
||||
|
||||
assertAcked(leaderClient().admin().indices().prepareAliases().addAliasAction(add));
|
||||
}
|
||||
|
||||
public void testAddMultipleAliasesAtOnce() throws Exception {
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("leader"));
|
||||
final PutFollowAction.Request followRequest = putFollow("leader", "follower");
|
||||
// we set a low poll timeout so that shard changes requests are responded to quickly even without indexing
|
||||
followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100));
|
||||
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
ensureFollowerGreen(true, "follower");
|
||||
|
||||
assertBusy(() -> assertShardFollowTask(1));
|
||||
|
||||
final int numberOfAliases = randomIntBetween(2, 8);
|
||||
final IndicesAliasesRequestBuilder builder = leaderClient().admin().indices().prepareAliases();
|
||||
for (int i = 0; i < numberOfAliases; i++) {
|
||||
builder.addAlias("leader", "alias_" + i);
|
||||
}
|
||||
assertAcked(builder);
|
||||
|
||||
final String[] aliases = new String[numberOfAliases];
|
||||
for (int i = 0; i < numberOfAliases; i++) {
|
||||
aliases[i] = "alias_" + i;
|
||||
}
|
||||
assertAliasesExist("leader", "follower", aliases);
|
||||
}
|
||||
|
||||
public void testAddMultipleAliasesSequentially() throws Exception {
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("leader"));
|
||||
final PutFollowAction.Request followRequest = putFollow("leader", "follower");
|
||||
// we set a low poll timeout so that shard changes requests are responded to quickly even without indexing
|
||||
followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100));
|
||||
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
ensureFollowerGreen(true, "follower");
|
||||
|
||||
assertBusy(() -> assertShardFollowTask(1));
|
||||
|
||||
final int numberOfAliases = randomIntBetween(2, 8);
|
||||
for (int i = 0; i < numberOfAliases; i++) {
|
||||
assertAcked(leaderClient().admin().indices().prepareAliases().addAlias("leader", "alias_" + i));
|
||||
|
||||
final String[] aliases = new String[i + 1];
|
||||
for (int j = 0; j < i + 1; j++) {
|
||||
aliases[j] = "alias_" + j;
|
||||
}
|
||||
assertAliasesExist("leader", "follower", aliases);
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpdateExistingAlias() throws Exception {
|
||||
runAddAliasTest(
|
||||
null,
|
||||
/*
|
||||
* After the alias is added (via runAddAliasTest) we modify the alias in place, and then assert that the modification is
|
||||
* eventually replicated.
|
||||
*/
|
||||
aliasName -> {
|
||||
assertAcked(leaderClient().admin()
|
||||
.indices()
|
||||
.prepareAliases()
|
||||
.addAlias("leader", aliasName, termQuery(randomAlphaOfLength(16), randomAlphaOfLength(16))));
|
||||
assertAliasesExist("leader", "follower", aliasName);
|
||||
});
|
||||
}
|
||||
|
||||
public void testRemoveExistingAlias() throws Exception {
|
||||
runAddAliasTest(
|
||||
false,
|
||||
aliasName -> {
|
||||
removeAlias(aliasName);
|
||||
assertAliasExistence(aliasName, false);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void removeAlias(final String aliasName) {
|
||||
assertAcked(leaderClient().admin().indices().prepareAliases().removeAlias("leader", aliasName));
|
||||
}
|
||||
|
||||
public void testStress() throws Exception {
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("leader"));
|
||||
final PutFollowAction.Request followRequest = putFollow("leader", "follower");
|
||||
// we set a low poll timeout so that shard changes requests are responded to quickly even without indexing
|
||||
followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100));
|
||||
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
final int numberOfThreads = randomIntBetween(2, 4);
|
||||
final int numberOfIterations = randomIntBetween(4, 32);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads + 1);
|
||||
final List<Thread> threads = new ArrayList<>(numberOfThreads);
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
final Thread thread = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (final BrokenBarrierException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
for (int j = 0; j < numberOfIterations; j++) {
|
||||
final String action = randomFrom("create", "update", "delete");
|
||||
switch (action) {
|
||||
case "create":
|
||||
addRandomAlias("leader", randomAlphaOfLength(16), randomFrom(new Boolean[] { null, false, true }));
|
||||
break;
|
||||
case "update":
|
||||
try {
|
||||
final String[] aliases = getAliasesOnLeader();
|
||||
if (aliases.length == 0) {
|
||||
continue;
|
||||
}
|
||||
final String alias = randomFrom(aliases);
|
||||
/*
|
||||
* Add an alias with the same name, which acts as an update (although another thread could concurrently
|
||||
* remove).
|
||||
*/
|
||||
addRandomAlias("leader", alias, randomFrom(new Boolean[] { null, false, true }));
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
break;
|
||||
case "delete":
|
||||
try {
|
||||
final String[] aliases = getAliasesOnLeader();
|
||||
if (aliases.length == 0) {
|
||||
continue;
|
||||
}
|
||||
final String alias = randomFrom(aliases);
|
||||
try {
|
||||
removeAlias(alias);
|
||||
} catch (final AliasesNotFoundException e) {
|
||||
// ignore, it could have been deleted by another thread
|
||||
continue;
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
assert false : action;
|
||||
}
|
||||
}
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (final BrokenBarrierException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
threads.add(thread);
|
||||
}
|
||||
barrier.await();
|
||||
|
||||
barrier.await();
|
||||
|
||||
for (final Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
assertAliasesExist("leader", "follower", getAliasesOnLeader());
|
||||
}
|
||||
|
||||
private String[] getAliasesOnLeader() throws InterruptedException, ExecutionException {
|
||||
final GetAliasesResponse response = leaderClient().admin().indices().getAliases(new GetAliasesRequest().indices("leader")).get();
|
||||
return response.getAliases().get("leader").stream().map(AliasMetaData::alias).toArray(String[]::new);
|
||||
}
|
||||
|
||||
private void assertAliasesExist(final String leaderIndex, final String followerIndex, final String... aliases) throws Exception {
|
||||
assertAliasesExist(leaderIndex, followerIndex, (alias, aliasMetaData) -> {}, aliases);
|
||||
}
|
||||
|
||||
private <E extends Exception> void assertAliasesExist(
|
||||
final String leaderIndex,
|
||||
final String followerIndex,
|
||||
final CheckedBiConsumer<String, AliasMetaData, E> aliasMetaDataAssertion,
|
||||
final String... aliases) throws Exception {
|
||||
// we must check serially because aliases exist will return true if any but not necessarily all of the requested aliases exist
|
||||
for (final String alias : aliases) {
|
||||
assertAliasExistence(alias, true);
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
final GetAliasesResponse followerResponse =
|
||||
followerClient().admin().indices().getAliases(new GetAliasesRequest().indices(followerIndex)).get();
|
||||
assertThat(
|
||||
"expected follower to have [" + aliases.length + "] aliases, but was " + followerResponse.getAliases().toString(),
|
||||
followerResponse.getAliases().get(followerIndex),
|
||||
hasSize(aliases.length));
|
||||
for (final String alias : aliases) {
|
||||
final AliasMetaData followerAliasMetaData = getAliasMetaData(followerResponse, followerIndex, alias);
|
||||
|
||||
final GetAliasesResponse leaderResponse =
|
||||
leaderClient().admin().indices().getAliases(new GetAliasesRequest().indices(leaderIndex).aliases(alias)).get();
|
||||
final AliasMetaData leaderAliasMetaData = getAliasMetaData(leaderResponse, leaderIndex, alias);
|
||||
|
||||
assertThat(
|
||||
"alias [" + alias + "] index routing did not replicate, but was " + followerAliasMetaData.toString(),
|
||||
followerAliasMetaData.indexRouting(), equalTo(leaderAliasMetaData.indexRouting()));
|
||||
assertThat(
|
||||
"alias [" + alias + "] search routing did not replicate, but was " + followerAliasMetaData.toString(),
|
||||
followerAliasMetaData.searchRoutingValues(), equalTo(leaderAliasMetaData.searchRoutingValues()));
|
||||
assertThat(
|
||||
"alias [" + alias + "] filtering did not replicate, but was " + followerAliasMetaData.toString(),
|
||||
followerAliasMetaData.filter(), equalTo(leaderAliasMetaData.filter()));
|
||||
assertThat(
|
||||
"alias [" + alias + "] should not be a write index, but was " + followerAliasMetaData.toString(),
|
||||
followerAliasMetaData.writeIndex(),
|
||||
equalTo(false));
|
||||
aliasMetaDataAssertion.accept(alias, followerAliasMetaData);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void assertAliasExistence(final String alias, final boolean exists) throws Exception {
|
||||
assertBusy(() -> {
|
||||
// we must check serially because aliases exist will return true if any but not necessarily all of the requested aliases exist
|
||||
final AliasesExistResponse response = followerClient().admin()
|
||||
.indices()
|
||||
.aliasesExist(new GetAliasesRequest().indices("follower").aliases(alias))
|
||||
.get();
|
||||
if (exists) {
|
||||
assertTrue("alias [" + alias + "] did not exist", response.exists());
|
||||
} else {
|
||||
assertFalse("alias [" + alias + "] exists", response.exists());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private AliasMetaData getAliasMetaData(final GetAliasesResponse response, final String index, final String alias) {
|
||||
final Optional<AliasMetaData> maybeAliasMetaData =
|
||||
response.getAliases().get(index).stream().filter(a -> a.getAlias().equals(alias)).findFirst();
|
||||
assertTrue("alias [" + alias + "] did not exist", maybeAliasMetaData.isPresent());
|
||||
return maybeAliasMetaData.get();
|
||||
}
|
||||
|
||||
private CheckedRunnable<Exception> assertShardFollowTask(final int numberOfPrimaryShards) {
|
||||
return () -> {
|
||||
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
|
||||
final PersistentTasksCustomMetaData taskMetadata = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertNotNull("task metadata for follower should exist", taskMetadata);
|
||||
|
||||
final ListTasksRequest listTasksRequest = new ListTasksRequest();
|
||||
listTasksRequest.setDetailed(true);
|
||||
listTasksRequest.setActions(ShardFollowTask.NAME + "[c]");
|
||||
final ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).actionGet();
|
||||
assertThat("expected no node failures", listTasksResponse.getNodeFailures().size(), equalTo(0));
|
||||
assertThat("expected no task failures", listTasksResponse.getTaskFailures().size(), equalTo(0));
|
||||
|
||||
final List<TaskInfo> taskInfos = listTasksResponse.getTasks();
|
||||
assertThat("expected a task for each shard", taskInfos.size(), equalTo(numberOfPrimaryShards));
|
||||
final Collection<PersistentTasksCustomMetaData.PersistentTask<?>> shardFollowTasks =
|
||||
taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull);
|
||||
for (final PersistentTasksCustomMetaData.PersistentTask<?> shardFollowTask : shardFollowTasks) {
|
||||
TaskInfo taskInfo = null;
|
||||
final String expectedId = "id=" + shardFollowTask.getId();
|
||||
for (final TaskInfo info : taskInfos) {
|
||||
if (expectedId.equals(info.getDescription())) {
|
||||
taskInfo = info;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull("task info for shard follow task [" + expectedId + "] should exist", taskInfo);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -15,6 +15,7 @@ public class ShardChangesResponseTests extends AbstractWireSerializingTestCase<S
|
||||
protected ShardChangesAction.Response createTestInstance() {
|
||||
final long mappingVersion = randomNonNegativeLong();
|
||||
final long settingsVersion = randomNonNegativeLong();
|
||||
final long aliasesVersion = randomNonNegativeLong();
|
||||
final long leaderGlobalCheckpoint = randomNonNegativeLong();
|
||||
final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
|
||||
final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(-1, Long.MAX_VALUE);
|
||||
@ -26,6 +27,7 @@ public class ShardChangesResponseTests extends AbstractWireSerializingTestCase<S
|
||||
return new ShardChangesAction.Response(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
leaderGlobalCheckpoint,
|
||||
leaderMaxSeqNo,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
|
@ -42,15 +42,19 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
|
||||
public void testSingleReaderWriter() throws Exception {
|
||||
TestRun testRun = createTestRun(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
|
||||
randomIntBetween(1, 2048));
|
||||
TestRun testRun = createTestRun(
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomIntBetween(1, 2048));
|
||||
ShardFollowNodeTask task = createShardFollowTask(1, testRun);
|
||||
startAndAssertAndStopTask(task, testRun);
|
||||
}
|
||||
|
||||
public void testMultipleReaderWriter() throws Exception {
|
||||
int concurrency = randomIntBetween(2, 8);
|
||||
TestRun testRun = createTestRun(0, 0, 0, between(1, 1024));
|
||||
TestRun testRun = createTestRun(0, 0, 0, 0, between(1, 1024));
|
||||
ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun);
|
||||
startAndAssertAndStopTask(task, testRun);
|
||||
}
|
||||
@ -110,6 +114,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
|
||||
private volatile long mappingVersion = 0L;
|
||||
private volatile long settingsVersion = 0L;
|
||||
private volatile long aliasesVersion = 0L;
|
||||
private final Map<Long, Integer> fromToSlot = new HashMap<>();
|
||||
|
||||
@Override
|
||||
@ -122,6 +127,11 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
handler.accept(settingsVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerUpdateAliases(LongConsumer handler, Consumer<Exception> errorHandler) {
|
||||
handler.accept(aliasesVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerSendBulkShardOperationsRequest(
|
||||
String followerHistoryUUID, List<Translog.Operation> operations,
|
||||
@ -172,8 +182,15 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
assert from >= testRun.finalExpectedGlobalCheckpoint;
|
||||
final long globalCheckpoint = tracker.getCheckpoint();
|
||||
final long maxSeqNo = tracker.getMaxSeqNo();
|
||||
handler.accept(new ShardChangesAction.Response(0L, 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(),
|
||||
new Translog.Operation[0], 1L));
|
||||
handler.accept(new ShardChangesAction.Response(
|
||||
0L,
|
||||
0L,
|
||||
0L,
|
||||
globalCheckpoint,
|
||||
maxSeqNo,
|
||||
randomNonNegativeLong(),
|
||||
new Translog.Operation[0],
|
||||
1L));
|
||||
}
|
||||
};
|
||||
threadPool.generic().execute(task);
|
||||
@ -233,10 +250,16 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
};
|
||||
}
|
||||
|
||||
private static TestRun createTestRun(long startSeqNo, long startMappingVersion, long startSettingsVersion, int maxOperationCount) {
|
||||
private static TestRun createTestRun(
|
||||
final long startSeqNo,
|
||||
final long startMappingVersion,
|
||||
final long startSettingsVersion,
|
||||
final long startAliasesVersion,
|
||||
final int maxOperationCount) {
|
||||
long prevGlobalCheckpoint = startSeqNo;
|
||||
long mappingVersion = startMappingVersion;
|
||||
long settingsVersion = startSettingsVersion;
|
||||
long aliasesVersion = startAliasesVersion;
|
||||
int numResponses = randomIntBetween(16, 256);
|
||||
Map<Long, List<TestResponse>> responses = new HashMap<>(numResponses);
|
||||
for (int i = 0; i < numResponses; i++) {
|
||||
@ -247,7 +270,9 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
if (sometimes()) {
|
||||
settingsVersion++;
|
||||
}
|
||||
|
||||
if (sometimes()) {
|
||||
aliasesVersion++;
|
||||
}
|
||||
if (sometimes()) {
|
||||
List<TestResponse> item = new ArrayList<>();
|
||||
// Sometimes add a random retryable error
|
||||
@ -268,6 +293,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
new ShardChangesAction.Response(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
nextGlobalCheckPoint,
|
||||
nextGlobalCheckPoint,
|
||||
randomNonNegativeLong(),
|
||||
@ -293,6 +319,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
ShardChangesAction.Response response = new ShardChangesAction.Response(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
prevGlobalCheckpoint,
|
||||
prevGlobalCheckpoint,
|
||||
randomNonNegativeLong(),
|
||||
@ -312,6 +339,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||
ShardChangesAction.Response response = new ShardChangesAction.Response(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
localLeaderGCP,
|
||||
localLeaderGCP,
|
||||
randomNonNegativeLong(),
|
||||
|
@ -59,6 +59,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomReadExceptions(),
|
||||
randomLong(),
|
||||
randomBoolean() ? new ElasticsearchException("fatal error") : null);
|
||||
@ -80,6 +81,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
|
||||
assertThat(newInstance.writeBufferOperationCount(), equalTo(expectedInstance.writeBufferOperationCount()));
|
||||
assertThat(newInstance.followerMappingVersion(), equalTo(expectedInstance.followerMappingVersion()));
|
||||
assertThat(newInstance.followerSettingsVersion(), equalTo(expectedInstance.followerSettingsVersion()));
|
||||
assertThat(newInstance.followerAliasesVersion(), equalTo(expectedInstance.followerAliasesVersion()));
|
||||
assertThat(newInstance.totalReadTimeMillis(), equalTo(expectedInstance.totalReadTimeMillis()));
|
||||
assertThat(newInstance.successfulReadRequests(), equalTo(expectedInstance.successfulReadRequests()));
|
||||
assertThat(newInstance.failedReadRequests(), equalTo(expectedInstance.failedReadRequests()));
|
||||
|
@ -72,6 +72,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
private Queue<Long> mappingVersions;
|
||||
private Queue<Exception> settingsUpdateFailures;
|
||||
private Queue<Long> settingsVersions;
|
||||
private Queue<Exception> aliasesUpdateFailures;
|
||||
private Queue<Long> aliasesVersions;
|
||||
private Queue<Long> leaderGlobalCheckpoints;
|
||||
private Queue<Long> followerGlobalCheckpoints;
|
||||
private Queue<Long> maxSeqNos;
|
||||
@ -88,7 +90,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
task.coordinateReads();
|
||||
assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request
|
||||
shardChangesRequests.clear();
|
||||
task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 60L));
|
||||
task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 1L, 60L));
|
||||
assertThat(shardChangesRequests, contains(new long[][]{
|
||||
{6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}}
|
||||
));
|
||||
@ -113,7 +115,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
shardChangesRequests.clear();
|
||||
// Also invokes the coordinatesReads() method:
|
||||
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
|
||||
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L));
|
||||
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached
|
||||
|
||||
ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
@ -139,7 +141,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
shardChangesRequests.clear();
|
||||
// Also invokes the coordinatesReads() method:
|
||||
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
|
||||
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L));
|
||||
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached
|
||||
|
||||
ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
@ -204,7 +206,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
task.markAsCompleted();
|
||||
shardChangesRequests.clear();
|
||||
// Also invokes the coordinatesReads() method:
|
||||
task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 31L));
|
||||
task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 1L, 31L));
|
||||
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
|
||||
|
||||
@ -234,7 +236,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
task.markAsCompleted();
|
||||
shardChangesRequests.clear();
|
||||
// Also invokes the coordinatesReads() method:
|
||||
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
|
||||
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L));
|
||||
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
|
||||
|
||||
@ -483,7 +485,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
startTask(task, 63, -1);
|
||||
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
||||
task.innerHandleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
||||
@ -513,7 +515,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
||||
|
||||
shardChangesRequests.clear();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 31L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 1L, 31L);
|
||||
task.innerHandleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(shardChangesRequests.size(), equalTo(1));
|
||||
@ -542,7 +544,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
shardChangesRequests.clear();
|
||||
task.markAsCompleted();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 31L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 1L, 31L);
|
||||
task.innerHandleReadResponse(0L, 64L, response);
|
||||
|
||||
assertThat(shardChangesRequests.size(), equalTo(0));
|
||||
@ -568,7 +570,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
||||
|
||||
shardChangesRequests.clear();
|
||||
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L));
|
||||
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 0, 100, new Translog.Operation[0], 1L));
|
||||
|
||||
assertThat(shardChangesRequests.size(), equalTo(1));
|
||||
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
|
||||
@ -591,7 +593,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
mappingVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 0L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
||||
@ -620,7 +622,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
}
|
||||
mappingVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 0L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(mappingUpdateFailures.size(), equalTo(0));
|
||||
@ -645,7 +647,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
mappingUpdateFailures.add(new RuntimeException());
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 64L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 0L, 64L);
|
||||
task.handleReadResponse(0L, 64L, response);
|
||||
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
||||
@ -668,7 +670,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
settingsVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 0L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
||||
@ -677,6 +679,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
assertThat(status.followerMappingVersion(), equalTo(0L));
|
||||
assertThat(status.followerSettingsVersion(), equalTo(1L));
|
||||
assertThat(status.followerAliasesVersion(), equalTo(0L));
|
||||
assertThat(status.outstandingReadRequests(), equalTo(1));
|
||||
assertThat(status.outstandingWriteRequests(), equalTo(1));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
@ -698,15 +701,16 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
}
|
||||
settingsVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 0L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(mappingUpdateFailures.size(), equalTo(0));
|
||||
assertThat(settingsUpdateFailures.size(), equalTo(0));
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
||||
assertThat(task.isStopped(), equalTo(false));
|
||||
ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
assertThat(status.followerMappingVersion(), equalTo(0L));
|
||||
assertThat(status.followerSettingsVersion(), equalTo(1L));
|
||||
assertThat(status.followerAliasesVersion(), equalTo(0L));
|
||||
assertThat(status.outstandingReadRequests(), equalTo(1));
|
||||
assertThat(status.outstandingWriteRequests(), equalTo(1));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
@ -723,7 +727,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
settingsUpdateFailures.add(new RuntimeException());
|
||||
task.coordinateReads();
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 64L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 0L, 64L);
|
||||
task.handleReadResponse(0L, 64L, response);
|
||||
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
||||
@ -731,6 +735,89 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
assertThat(status.followerMappingVersion(), equalTo(0L));
|
||||
assertThat(status.followerSettingsVersion(), equalTo(0L));
|
||||
assertThat(status.followerAliasesVersion(), equalTo(0L));
|
||||
assertThat(status.outstandingReadRequests(), equalTo(1));
|
||||
assertThat(status.outstandingWriteRequests(), equalTo(0));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
|
||||
}
|
||||
|
||||
public void testAliasUpdate() {
|
||||
final ShardFollowTaskParams params = new ShardFollowTaskParams();
|
||||
params.maxReadRequestOperationCount = 64;
|
||||
params.maxOutstandingReadRequests = 1;
|
||||
params.maxOutstandingWriteRequests = 1;
|
||||
final ShardFollowNodeTask task = createShardFollowTask(params);
|
||||
startTask(task, 63, -1);
|
||||
|
||||
aliasesVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
final ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
||||
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
|
||||
|
||||
final ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
assertThat(status.followerMappingVersion(), equalTo(0L));
|
||||
assertThat(status.followerSettingsVersion(), equalTo(0L));
|
||||
assertThat(status.followerAliasesVersion(), equalTo(1L));
|
||||
assertThat(status.outstandingReadRequests(), equalTo(1));
|
||||
assertThat(status.outstandingWriteRequests(), equalTo(1));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
|
||||
assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
|
||||
}
|
||||
|
||||
public void testAliasUpdateRetryableError() {
|
||||
final ShardFollowTaskParams params = new ShardFollowTaskParams();
|
||||
params.maxReadRequestOperationCount = 64;
|
||||
params.maxOutstandingReadRequests = 1;
|
||||
params.maxOutstandingWriteRequests = 1;
|
||||
final ShardFollowNodeTask task = createShardFollowTask(params);
|
||||
startTask(task, 63, -1);
|
||||
|
||||
int max = randomIntBetween(1, 30);
|
||||
for (int i = 0; i < max; i++) {
|
||||
aliasesUpdateFailures.add(new ConnectException());
|
||||
}
|
||||
aliasesVersions.add(1L);
|
||||
task.coordinateReads();
|
||||
final ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
||||
task.handleReadResponse(0L, 63L, response);
|
||||
|
||||
assertThat(aliasesUpdateFailures.size(), equalTo(0));
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
||||
assertThat(task.isStopped(), equalTo(false));
|
||||
final ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
assertThat(status.followerMappingVersion(), equalTo(0L));
|
||||
assertThat(status.followerSettingsVersion(), equalTo(0L));
|
||||
assertThat(status.followerAliasesVersion(), equalTo(1L));
|
||||
assertThat(status.outstandingReadRequests(), equalTo(1));
|
||||
assertThat(status.outstandingWriteRequests(), equalTo(1));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
|
||||
}
|
||||
|
||||
public void testAliasUpdateNonRetryableError() {
|
||||
final ShardFollowTaskParams params = new ShardFollowTaskParams();
|
||||
params.maxReadRequestOperationCount = 64;
|
||||
params.maxOutstandingReadRequests = 1;
|
||||
params.maxOutstandingWriteRequests = 1;
|
||||
final ShardFollowNodeTask task = createShardFollowTask(params);
|
||||
startTask(task, 63, -1);
|
||||
|
||||
aliasesUpdateFailures.add(new RuntimeException());
|
||||
task.coordinateReads();
|
||||
final ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 0L, 1L, 64L);
|
||||
task.handleReadResponse(0L, 64L, response);
|
||||
|
||||
assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
||||
assertThat(task.isStopped(), equalTo(true));
|
||||
final ShardFollowNodeTaskStatus status = task.getStatus();
|
||||
assertThat(status.followerMappingVersion(), equalTo(0L));
|
||||
assertThat(status.followerSettingsVersion(), equalTo(0L));
|
||||
assertThat(status.followerAliasesVersion(), equalTo(0L));
|
||||
assertThat(status.outstandingReadRequests(), equalTo(1));
|
||||
assertThat(status.outstandingWriteRequests(), equalTo(0));
|
||||
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
||||
@ -752,7 +839,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
|
||||
assertThat(shardChangesRequests.get(0)[1], equalTo(128L));
|
||||
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 63L, response);
|
||||
|
||||
@ -772,7 +859,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
params.maxWriteRequestOperationCount = 64;
|
||||
params.maxOutstandingWriteRequests = 2;
|
||||
ShardFollowNodeTask task = createShardFollowTask(params);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 64L, response);
|
||||
|
||||
@ -785,7 +872,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
params.maxOutstandingWriteRequests = 4; // change to 4 outstanding writers
|
||||
task = createShardFollowTask(params);
|
||||
response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
|
||||
response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 64L, response);
|
||||
|
||||
@ -804,7 +891,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
params.maxWriteRequestOperationCount = 8;
|
||||
params.maxOutstandingWriteRequests = 32;
|
||||
ShardFollowNodeTask task = createShardFollowTask(params);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 64L, response);
|
||||
|
||||
@ -835,7 +922,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
for (int i = 0; i < max; i++) {
|
||||
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
|
||||
}
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 63L, response);
|
||||
|
||||
@ -864,7 +951,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
||||
|
||||
writeFailures.add(new RuntimeException());
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 63L, response);
|
||||
|
||||
@ -891,7 +978,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
|
||||
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
||||
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 64L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 64L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 64L, response);
|
||||
|
||||
@ -914,7 +1001,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
|
||||
shardChangesRequests.clear();
|
||||
followerGlobalCheckpoints.add(63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
||||
// Also invokes coordinatesWrites()
|
||||
task.innerHandleReadResponse(0L, 63L, response);
|
||||
|
||||
@ -1013,6 +1100,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
mappingVersions = new LinkedList<>();
|
||||
settingsUpdateFailures = new LinkedList<>();
|
||||
settingsVersions = new LinkedList<>();
|
||||
aliasesUpdateFailures = new LinkedList<>();
|
||||
aliasesVersions = new LinkedList<>();
|
||||
leaderGlobalCheckpoints = new LinkedList<>();
|
||||
followerGlobalCheckpoints = new LinkedList<>();
|
||||
maxSeqNos = new LinkedList<>();
|
||||
@ -1048,6 +1137,20 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exception> errorHandler) {
|
||||
final Exception failure = aliasesUpdateFailures.poll();
|
||||
if (failure != null) {
|
||||
errorHandler.accept(failure);
|
||||
return;
|
||||
}
|
||||
|
||||
final Long aliasesVersion = aliasesVersions.poll();
|
||||
if (aliasesVersion != null) {
|
||||
handler.accept(aliasesVersion);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerSendBulkShardOperationsRequest(
|
||||
String followerHistoryUUID, final List<Translog.Operation> operations,
|
||||
@ -1086,6 +1189,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
final ShardChangesAction.Response response = new ShardChangesAction.Response(
|
||||
mappingVersions.poll(),
|
||||
0L,
|
||||
0L,
|
||||
leaderGlobalCheckpoints.poll(),
|
||||
maxSeqNos.poll(),
|
||||
randomNonNegativeLong(),
|
||||
@ -1153,6 +1257,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
long toSeqNo,
|
||||
long mappingVersion,
|
||||
long settingsVersion,
|
||||
long aliasesVersion,
|
||||
long leaderGlobalCheckPoint) {
|
||||
List<Translog.Operation> ops = new ArrayList<>();
|
||||
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
|
||||
@ -1163,6 +1268,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||
return new ShardChangesAction.Response(
|
||||
mappingVersion,
|
||||
settingsVersion,
|
||||
aliasesVersion,
|
||||
leaderGlobalCheckPoint,
|
||||
leaderGlobalCheckPoint,
|
||||
randomNonNegativeLong(),
|
||||
|
@ -514,6 +514,12 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||
handler.accept(1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerUpdateAliases(LongConsumer handler, Consumer<Exception> errorHandler) {
|
||||
// no-op as alias updates are not tested here
|
||||
handler.accept(1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerSendBulkShardOperationsRequest(
|
||||
final String followerHistoryUUID,
|
||||
@ -544,14 +550,21 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||
final SeqNoStats seqNoStats = indexShard.seqNoStats();
|
||||
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
|
||||
if (from > seqNoStats.getGlobalCheckpoint()) {
|
||||
handler.accept(ShardChangesAction.getResponse(1L, 1L, seqNoStats,
|
||||
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
|
||||
handler.accept(ShardChangesAction.getResponse(
|
||||
1L,
|
||||
1L,
|
||||
1L,
|
||||
seqNoStats,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
ShardChangesAction.EMPTY_OPERATIONS_ARRAY,
|
||||
1L));
|
||||
return;
|
||||
}
|
||||
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
|
||||
maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxReadRequestSize());
|
||||
// hard code mapping version; this is ok, as mapping updates are not tested here
|
||||
final ShardChangesAction.Response response = new ShardChangesAction.Response(
|
||||
1L,
|
||||
1L,
|
||||
1L,
|
||||
seqNoStats.getGlobalCheckpoint(),
|
||||
|
@ -57,6 +57,7 @@ public class StatsResponsesTests extends AbstractWireSerializingTestCase<FollowS
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
Collections.emptyNavigableMap(),
|
||||
randomLong(),
|
||||
randomBoolean() ? new ElasticsearchException("fatal error") : null);
|
||||
|
@ -95,6 +95,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
|
||||
final long writeBufferSizeInBytes = randomNonNegativeLong();
|
||||
final long followerMappingVersion = randomNonNegativeLong();
|
||||
final long followerSettingsVersion = randomNonNegativeLong();
|
||||
final long followerAliasesVersion = randomNonNegativeLong();
|
||||
final long totalReadTimeMillis = randomLongBetween(0, 4096);
|
||||
final long totalReadRemoteExecTimeMillis = randomLongBetween(0, 4096);
|
||||
final long successfulReadRequests = randomNonNegativeLong();
|
||||
@ -126,6 +127,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
|
||||
writeBufferSizeInBytes,
|
||||
followerMappingVersion,
|
||||
followerSettingsVersion,
|
||||
followerAliasesVersion,
|
||||
totalReadTimeMillis,
|
||||
totalReadRemoteExecTimeMillis,
|
||||
successfulReadRequests,
|
||||
@ -173,6 +175,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
|
||||
+ "\"write_buffer_size_in_bytes\":" + writeBufferSizeInBytes + ","
|
||||
+ "\"follower_mapping_version\":" + followerMappingVersion + ","
|
||||
+ "\"follower_settings_version\":" + followerSettingsVersion + ","
|
||||
+ "\"follower_aliases_version\":" + followerAliasesVersion + ","
|
||||
+ "\"total_read_time_millis\":" + totalReadTimeMillis + ","
|
||||
+ "\"total_read_remote_exec_time_millis\":" + totalReadRemoteExecTimeMillis + ","
|
||||
+ "\"successful_read_requests\":" + successfulReadRequests + ","
|
||||
@ -218,6 +221,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
|
||||
1,
|
||||
1,
|
||||
1,
|
||||
1,
|
||||
100,
|
||||
50,
|
||||
10,
|
||||
|
@ -7,6 +7,7 @@
|
||||
package org.elasticsearch.xpack.core.ccr;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
@ -49,6 +50,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
private static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes");
|
||||
private static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version");
|
||||
private static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version");
|
||||
private static final ParseField FOLLOWER_ALIASES_VERSION_FIELD = new ParseField("follower_aliases_version");
|
||||
private static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis");
|
||||
private static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis");
|
||||
private static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests");
|
||||
@ -93,12 +95,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
(long) args[22],
|
||||
(long) args[23],
|
||||
(long) args[24],
|
||||
(long) args[25],
|
||||
new TreeMap<>(
|
||||
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[25])
|
||||
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[26])
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
|
||||
(long) args[26],
|
||||
(ElasticsearchException) args[27]));
|
||||
(long) args[27],
|
||||
(ElasticsearchException) args[28]));
|
||||
|
||||
public static final String READ_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-read-exceptions-entry";
|
||||
|
||||
@ -123,6 +126,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_ALIASES_VERSION_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD);
|
||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD);
|
||||
@ -243,6 +247,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
return followerSettingsVersion;
|
||||
}
|
||||
|
||||
private final long followerAliasesVersion;
|
||||
|
||||
public long followerAliasesVersion() {
|
||||
return followerAliasesVersion;
|
||||
}
|
||||
|
||||
private final long totalReadTimeMillis;
|
||||
|
||||
public long totalReadTimeMillis() {
|
||||
@ -337,6 +347,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
final long writeBufferSizeInBytes,
|
||||
final long followerMappingVersion,
|
||||
final long followerSettingsVersion,
|
||||
final long followerAliasesVersion,
|
||||
final long totalReadTimeMillis,
|
||||
final long totalReadRemoteExecTimeMillis,
|
||||
final long successfulReadRequests,
|
||||
@ -365,6 +376,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
this.writeBufferSizeInBytes = writeBufferSizeInBytes;
|
||||
this.followerMappingVersion = followerMappingVersion;
|
||||
this.followerSettingsVersion = followerSettingsVersion;
|
||||
this.followerAliasesVersion = followerAliasesVersion;
|
||||
this.totalReadTimeMillis = totalReadTimeMillis;
|
||||
this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis;
|
||||
this.successfulReadRequests = successfulReadRequests;
|
||||
@ -396,6 +408,11 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
this.writeBufferSizeInBytes = in.readVLong();
|
||||
this.followerMappingVersion = in.readVLong();
|
||||
this.followerSettingsVersion = in.readVLong();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
this.followerAliasesVersion = in.readVLong();
|
||||
} else {
|
||||
this.followerAliasesVersion = 0L;
|
||||
}
|
||||
this.totalReadTimeMillis = in.readVLong();
|
||||
this.totalReadRemoteExecTimeMillis = in.readVLong();
|
||||
this.successfulReadRequests = in.readVLong();
|
||||
@ -434,6 +451,9 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
out.writeVLong(writeBufferSizeInBytes);
|
||||
out.writeVLong(followerMappingVersion);
|
||||
out.writeVLong(followerSettingsVersion);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
out.writeVLong(followerAliasesVersion);
|
||||
}
|
||||
out.writeVLong(totalReadTimeMillis);
|
||||
out.writeVLong(totalReadRemoteExecTimeMillis);
|
||||
out.writeVLong(successfulReadRequests);
|
||||
@ -484,6 +504,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
new ByteSizeValue(writeBufferSizeInBytes));
|
||||
builder.field(FOLLOWER_MAPPING_VERSION_FIELD.getPreferredName(), followerMappingVersion);
|
||||
builder.field(FOLLOWER_SETTINGS_VERSION_FIELD.getPreferredName(), followerSettingsVersion);
|
||||
builder.field(FOLLOWER_ALIASES_VERSION_FIELD.getPreferredName(), followerAliasesVersion);
|
||||
builder.humanReadableField(
|
||||
TOTAL_READ_TIME_MILLIS_FIELD.getPreferredName(),
|
||||
"total_read_time",
|
||||
@ -564,7 +585,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
writeBufferOperationCount == that.writeBufferOperationCount &&
|
||||
writeBufferSizeInBytes == that.writeBufferSizeInBytes &&
|
||||
followerMappingVersion == that.followerMappingVersion &&
|
||||
followerSettingsVersion== that.followerSettingsVersion &&
|
||||
followerSettingsVersion == that.followerSettingsVersion &&
|
||||
followerAliasesVersion == that.followerAliasesVersion &&
|
||||
totalReadTimeMillis == that.totalReadTimeMillis &&
|
||||
totalReadRemoteExecTimeMillis == that.totalReadRemoteExecTimeMillis &&
|
||||
successfulReadRequests == that.successfulReadRequests &&
|
||||
@ -604,6 +626,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||
writeBufferSizeInBytes,
|
||||
followerMappingVersion,
|
||||
followerSettingsVersion,
|
||||
followerAliasesVersion,
|
||||
totalReadTimeMillis,
|
||||
totalReadRemoteExecTimeMillis,
|
||||
successfulReadRequests,
|
||||
|
@ -974,6 +974,9 @@
|
||||
"follower_settings_version": {
|
||||
"type": "long"
|
||||
},
|
||||
"follower_aliases_version": {
|
||||
"type": "long"
|
||||
},
|
||||
"total_read_time_millis": {
|
||||
"type": "long"
|
||||
},
|
||||
|
@ -188,6 +188,7 @@ public class WaitForFollowShardTasksStepTests extends AbstractStepTestCase<WaitF
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
Collections.emptyNavigableMap(),
|
||||
0,
|
||||
null
|
||||
|
@ -66,8 +66,8 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||
putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24));
|
||||
followIndex(indexName, indexName);
|
||||
ensureGreen(indexName);
|
||||
// Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster:
|
||||
client().performRequest(new Request("PUT", "/" + indexName + "/_alias/logs"));
|
||||
|
||||
assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + indexName + "/_alias/logs"))));
|
||||
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
index(leaderClient, indexName, "1");
|
||||
@ -226,8 +226,8 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||
// Check that it got replicated to the follower
|
||||
assertBusy(() -> assertTrue(indexExists(indexName)));
|
||||
|
||||
// Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster:
|
||||
client().performRequest(new Request("PUT", "/" + indexName + "/_alias/" + alias));
|
||||
// check that the alias was replicated
|
||||
assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + indexName + "/_alias/" + alias))));
|
||||
|
||||
index(leaderClient, indexName, "1");
|
||||
assertDocumentExists(leaderClient, indexName, "1");
|
||||
@ -252,7 +252,6 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||
// And the old index should have a write block and indexing complete set
|
||||
assertThat(getIndexSetting(leaderClient, indexName, "index.blocks.write"), equalTo("true"));
|
||||
assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true"));
|
||||
|
||||
});
|
||||
|
||||
assertBusy(() -> {
|
||||
@ -266,6 +265,8 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||
assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), nullValue());
|
||||
// The next index should have been created on the follower as well
|
||||
indexExists(nextIndexName);
|
||||
// and the alias should be on the next index
|
||||
assertOK(client().performRequest(new Request("HEAD", "/" + nextIndexName + "/_alias/" + alias)));
|
||||
});
|
||||
|
||||
assertBusy(() -> {
|
||||
@ -281,6 +282,74 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testAliasReplicatedOnShrink() throws Exception {
|
||||
final String indexName = "shrink-alias-test";
|
||||
final String shrunkenIndexName = "shrink-" + indexName;
|
||||
final String policyName = "shrink-test-policy";
|
||||
|
||||
final int numberOfAliases = randomIntBetween(0, 4);
|
||||
|
||||
if ("leader".equals(targetCluster)) {
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.put("index.number_of_shards", 3)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.lifecycle.name", policyName) // this policy won't exist on the leader, that's fine
|
||||
.build();
|
||||
final StringBuilder aliases = new StringBuilder();
|
||||
boolean first = true;
|
||||
for (int i = 0; i < numberOfAliases; i++) {
|
||||
if (first == false) {
|
||||
aliases.append(",");
|
||||
}
|
||||
final Boolean isWriteIndex = randomFrom(new Boolean[] { null, false, true });
|
||||
if (isWriteIndex == null) {
|
||||
aliases.append("\"alias_").append(i).append("\":{}");
|
||||
} else {
|
||||
aliases.append("\"alias_").append(i).append("\":{\"is_write_index\":").append(isWriteIndex).append("}");
|
||||
}
|
||||
first = false;
|
||||
}
|
||||
createIndex(indexName, indexSettings, "", aliases.toString());
|
||||
ensureGreen(indexName);
|
||||
} else if ("follow".equals(targetCluster)) {
|
||||
// Create a policy with just a Shrink action on the follower
|
||||
putShrinkOnlyPolicy(client(), policyName);
|
||||
|
||||
// Follow the index
|
||||
followIndex(indexName, indexName);
|
||||
// Make sure it actually took
|
||||
assertBusy(() -> assertTrue(indexExists(indexName)));
|
||||
// This should now be in the "warm" phase waiting for the index to be ready to unfollow
|
||||
assertBusy(() -> assertILMPolicy(client(), indexName, policyName, "warm", "unfollow", "wait-for-indexing-complete"));
|
||||
|
||||
// Set the indexing_complete flag on the leader so the index will actually unfollow
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
updateIndexSettings(leaderClient, indexName, Settings.builder()
|
||||
.put("index.lifecycle.indexing_complete", true)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
// Wait for the setting to get replicated
|
||||
assertBusy(() -> assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true")));
|
||||
|
||||
// Wait for the index to continue with its lifecycle and be shrunk
|
||||
assertBusy(() -> assertTrue(indexExists(shrunkenIndexName)));
|
||||
|
||||
// assert the aliases were replicated
|
||||
assertBusy(() -> {
|
||||
for (int i = 0; i < numberOfAliases; i++) {
|
||||
assertOK(client().performRequest(new Request("HEAD", "/" + shrunkenIndexName + "/_alias/alias_" + i)));
|
||||
}
|
||||
});
|
||||
assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + shrunkenIndexName + "/_alias/" + indexName))));
|
||||
|
||||
// Wait for the index to complete its policy
|
||||
assertBusy(() -> assertILMPolicy(client(), shrunkenIndexName, policyName, "completed", "completed", "completed"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testUnfollowInjectedBeforeShrink() throws Exception {
|
||||
final String indexName = "shrink-test";
|
||||
final String shrunkenIndexName = "shrink-" + indexName;
|
||||
|
Loading…
x
Reference in New Issue
Block a user