[CCR] Added history uuid validation (#33546)

For correctness we need to verify whether the history uuid of the leader
index shards never changes while that index is being followed.

* The history UUIDs are recorded as custom index metadata in the follow index.
* The follow api validates whether the current history UUIDs of the leader
  index shards are the same as the recorded history UUIDs.
  If not the follow api fails.
* While a follow index is following a leader index; shard follow tasks
  on each shard changes api call verify whether their current history uuid
  is the same as the recorded history uuid.

Relates to #30086 

Co-authored-by: Nhat Nguyen <nhat.nguyen@elastic.co>
This commit is contained in:
Martijn van Groningen 2018-09-12 19:42:00 +02:00 committed by GitHub
parent c023f67c5d
commit 5fa81310cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 442 additions and 122 deletions

View File

@ -443,6 +443,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return primary;
}
public synchronized void reinitPrimaryShard() throws IOException {
primary = reinitShard(primary);
}
public void syncGlobalCheckpoint() {
PlainActionFuture<ReplicationResponse> listener = new PlainActionFuture<>();
try {

View File

@ -113,7 +113,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
e = expectThrows(ResponseException.class,
() -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + unallowedIndex + "] does not exist"));
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
}

View File

@ -85,6 +85,8 @@ import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING;
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin {
public static final String CCR_THREAD_POOL_NAME = "ccr";
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
private final boolean enabled;
private final Settings settings;

View File

@ -10,9 +10,18 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
@ -21,6 +30,7 @@ import org.elasticsearch.xpack.core.XPackPlugin;
import java.util.Collections;
import java.util.Locale;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
@ -58,23 +68,24 @@ public final class CcrLicenseChecker {
}
/**
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked.
* Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
* Fetches the leader index metadata and history UUIDs for leader index shards from the remote cluster.
* Before fetching the index metadata, the remote cluster is checked for license compatibility with CCR.
* If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise,
* the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderIndex the name of the leader index
* @param onFailure the failure consumer
* @param leaderIndexMetadataConsumer the leader index metadata consumer
* @param <T> the type of response the listener is waiting for
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderIndex the name of the leader index
* @param onFailure the failure consumer
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
final Client client,
final String clusterAlias,
final String leaderIndex,
final Consumer<Exception> onFailure,
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
final BiConsumer<String[], IndexMetaData> consumer) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
@ -85,7 +96,13 @@ public final class CcrLicenseChecker {
clusterAlias,
request,
onFailure,
leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)),
leaderClusterState -> {
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> {
consumer.accept(historyUUIDs, leaderIndexMetaData);
});
},
licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck),
e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e));
}
@ -168,6 +185,58 @@ public final class CcrLicenseChecker {
});
}
/**
* Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient.
*
* @param leaderClient the leader client
* @param leaderIndexMetaData the leader index metadata
* @param onFailure the failure consumer
* @param historyUUIDConsumer the leader index history uuid and consumer
*/
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
// in case of following a local or a remote cluster.
public void fetchLeaderHistoryUUIDs(
final Client leaderClient,
final IndexMetaData leaderIndexMetaData,
final Consumer<Exception> onFailure,
final Consumer<String[]> historyUUIDConsumer) {
String leaderIndex = leaderIndexMetaData.getIndex().getName();
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> {
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex);
String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()];
for (IndexShardStats indexShardStats : indexStats) {
for (ShardStats shardStats : indexShardStats) {
// Ignore replica shards as they may not have yet started and
// we just end up overwriting slots in historyUUIDs
if (shardStats.getShardRouting().primary() == false) {
continue;
}
CommitStats commitStats = shardStats.getCommitStats();
if (commitStats == null) {
onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing"));
return;
}
String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY);
ShardId shardId = shardStats.getShardRouting().shardId();
historyUUIDs[shardId.id()] = historyUUID;
}
}
for (int i = 0; i < historyUUIDs.length; i++) {
if (historyUUIDs[i] == null) {
onFailure.accept(new IllegalArgumentException("no history uuid for [" + leaderIndex + "][" + i + "]"));
return;
}
}
historyUUIDConsumer.accept(historyUUIDs);
};
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
request.indices(leaderIndex);
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
}
private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();

View File

@ -58,11 +58,13 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
private long fromSeqNo;
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
public Request(ShardId shardId) {
public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());
this.shardId = shardId;
this.expectedHistoryUUID = expectedHistoryUUID;
}
Request() {
@ -96,6 +98,10 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
}
public String getExpectedHistoryUUID() {
return expectedHistoryUUID;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -119,6 +125,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
fromSeqNo = in.readVLong();
maxOperationCount = in.readVInt();
shardId = ShardId.readShardId(in);
expectedHistoryUUID = in.readString();
maxOperationSizeInBytes = in.readVLong();
}
@ -128,6 +135,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
out.writeVLong(fromSeqNo);
out.writeVInt(maxOperationCount);
shardId.writeTo(out);
out.writeString(expectedHistoryUUID);
out.writeVLong(maxOperationSizeInBytes);
}
@ -140,12 +148,13 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
return fromSeqNo == request.fromSeqNo &&
maxOperationCount == request.maxOperationCount &&
Objects.equals(shardId, request.shardId) &&
Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) &&
maxOperationSizeInBytes == request.maxOperationSizeInBytes;
}
@Override
public int hashCode() {
return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes);
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes);
}
@Override
@ -154,6 +163,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
"fromSeqNo=" + fromSeqNo +
", maxOperationCount=" + maxOperationCount +
", shardId=" + shardId +
", expectedHistoryUUID=" + expectedHistoryUUID +
", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
'}';
}
@ -189,7 +199,12 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
Response() {
}
Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
Response(
final long mappingVersion,
final long globalCheckpoint,
final long maxSeqNo,
final Translog.Operation[] operations) {
this.mappingVersion = mappingVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
@ -260,6 +275,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
seqNoStats.getGlobalCheckpoint(),
request.fromSeqNo,
request.maxOperationCount,
request.expectedHistoryUUID,
request.maxOperationSizeInBytes);
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
}
@ -293,11 +309,20 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
* Also if the sum of collected operations' size is above the specified maxOperationSizeInBytes then this method
* stops collecting more operations and returns what has been collected so far.
*/
static Translog.Operation[] getOperations(IndexShard indexShard, long globalCheckpoint, long fromSeqNo, int maxOperationCount,
static Translog.Operation[] getOperations(IndexShard indexShard,
long globalCheckpoint,
long fromSeqNo,
int maxOperationCount,
String expectedHistoryUUID,
long maxOperationSizeInBytes) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
final String historyUUID = indexShard.getHistoryUUID();
if (historyUUID.equals(expectedHistoryUUID) == false) {
throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" +
historyUUID + "]");
}
if (fromSeqNo > globalCheckpoint) {
return EMPTY_OPERATIONS_ARRAY;
}

View File

@ -50,12 +50,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid");
@SuppressWarnings("unchecked")
private static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (long) a[9],
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map<String, String>) a[14]));
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (String) a[14], (Map<String, String>) a[15]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD);
@ -76,6 +77,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
}
@ -89,11 +91,22 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final int maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardRetryDelay;
private final String recordedLeaderIndexHistoryUUID;
private final Map<String, String> headers;
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount,
int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches,
int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map<String, String> headers) {
ShardFollowTask(
String leaderClusterAlias,
ShardId followShardId,
ShardId leaderShardId,
int maxBatchOperationCount,
int maxConcurrentReadBatches,
long maxBatchSizeInBytes,
int maxConcurrentWriteBatches,
int maxWriteBufferSize,
TimeValue maxRetryDelay,
TimeValue idleShardRetryDelay,
String recordedLeaderIndexHistoryUUID,
Map<String, String> headers) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
@ -104,6 +117,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}
@ -118,6 +132,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxWriteBufferSize = in.readVInt();
this.maxRetryDelay = in.readTimeValue();
this.idleShardRetryDelay = in.readTimeValue();
this.recordedLeaderIndexHistoryUUID = in.readString();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
@ -165,6 +180,10 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
return followShardId.getIndex().getUUID() + "-" + followShardId.getId();
}
public String getRecordedLeaderIndexHistoryUUID() {
return recordedLeaderIndexHistoryUUID;
}
public Map<String, String> getHeaders() {
return headers;
}
@ -186,6 +205,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
out.writeVInt(maxWriteBufferSize);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(idleShardRetryDelay);
out.writeString(recordedLeaderIndexHistoryUUID);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
@ -212,6 +232,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID);
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
}
@ -231,13 +252,26 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
maxWriteBufferSize == that.maxWriteBufferSize &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) &&
Objects.equals(headers, that.headers);
}
@Override
public int hashCode() {
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches,
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers);
return Objects.hash(
leaderClusterAlias,
followShardId,
leaderShardId,
maxBatchOperationCount,
maxConcurrentReadBatches,
maxConcurrentWriteBatches,
maxBatchSizeInBytes,
maxWriteBufferSize,
maxRetryDelay,
idleShardRetryDelay,
recordedLeaderIndexHistoryUUID,
headers
);
}
public String toString() {

View File

@ -133,7 +133,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler) {
ShardChangesAction.Request request = new ShardChangesAction.Request(params.getLeaderShardId());
ShardChangesAction.Request request =
new ShardChangesAction.Request(params.getLeaderShardId(), params.getRecordedLeaderIndexHistoryUUID());
request.setFromSeqNo(from);
request.setMaxOperationCount(maxOperationCount);
request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes());

View File

@ -33,14 +33,17 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
public final class TransportCreateAndFollowIndexAction
extends TransportMasterNodeAction<CreateAndFollowIndexAction.Request, CreateAndFollowIndexAction.Response> {
@ -116,8 +119,12 @@ public final class TransportCreateAndFollowIndexAction
final ClusterState state,
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
// following an index in local cluster, so use local cluster state to fetch leader index metadata
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex());
createFollowerIndex(leaderIndexMetadata, request, listener);
final String leaderIndex = request.getFollowRequest().getLeaderIndex();
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex);
Consumer<String[]> handler = historyUUIDs -> {
createFollowerIndex(leaderIndexMetadata, historyUUIDs, request, listener);
};
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, handler);
}
private void createFollowerIndexAndFollowRemoteIndex(
@ -125,16 +132,17 @@ public final class TransportCreateAndFollowIndexAction
final String clusterAlias,
final String leaderIndex,
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client,
clusterAlias,
leaderIndex,
listener::onFailure,
leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener));
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
}
private void createFollowerIndex(
final IndexMetaData leaderIndexMetaData,
final String[] historyUUIDs,
final CreateAndFollowIndexAction.Request request,
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
if (leaderIndexMetaData == null) {
@ -172,6 +180,11 @@ public final class TransportCreateAndFollowIndexAction
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex);
// Adding the leader index uuid for each shard as custom metadata:
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
// Copy all settings, but overwrite a few settings.
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(leaderIndexMetaData.getSettings());

View File

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog;
@ -35,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
@ -110,11 +112,16 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
// following an index in local cluster, so use local cluster state to fetch leader index metadata
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex());
try {
start(request, null, leaderIndexMetadata, followerIndexMetadata, listener);
} catch (final IOException e) {
listener.onFailure(e);
if (leaderIndexMetadata == null) {
throw new IndexNotFoundException(request.getFollowerIndex());
}
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDs -> {
try {
start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener);
} catch (final IOException e) {
listener.onFailure(e);
}
});
}
private void followRemoteIndex(
@ -124,14 +131,14 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
final ActionListener<AcknowledgedResponse> listener) {
final ClusterState state = clusterService.state();
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client,
clusterAlias,
leaderIndex,
listener::onFailure,
leaderIndexMetadata -> {
(leaderHistoryUUID, leaderIndexMetadata) -> {
try {
start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener);
start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener);
} catch (final IOException e) {
listener.onFailure(e);
}
@ -153,18 +160,23 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
String clusterNameAlias,
IndexMetaData leaderIndexMetadata,
IndexMetaData followIndexMetadata,
String[] leaderIndexHistoryUUIDs,
ActionListener<AcknowledgedResponse> handler) throws IOException {
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
validate(request, leaderIndexMetadata, followIndexMetadata, mapperService);
validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndexMetadata);
String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId];
ShardFollowTask shardFollowTask = new ShardFollowTask(
clusterNameAlias,
@ -177,6 +189,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getIdleShardRetryDelay(),
recordedLeaderShardHistoryUUID,
filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@ -224,6 +237,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
final FollowIndexAction.Request request,
final IndexMetaData leaderIndex,
final IndexMetaData followIndex,
final String[] leaderIndexHistoryUUID,
final MapperService followerMapperService) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not exist");
@ -231,6 +245,19 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] does not exist");
}
String[] recordedHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndex);
assert recordedHistoryUUIDs.length == leaderIndexHistoryUUID.length;
for (int i = 0; i < leaderIndexHistoryUUID.length; i++) {
String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs[i];
String actualLeaderIndexHistoryUUID = leaderIndexHistoryUUID[i];
if (recordedLeaderIndexHistoryUUID.equals(actualLeaderIndexHistoryUUID) == false) {
throw new IllegalArgumentException("leader shard [" + request.getFollowerIndex() + "][" + i + "] should reference [" +
recordedLeaderIndexHistoryUUID + "] as history uuid but instead reference [" + actualLeaderIndexHistoryUUID +
"] as history uuid");
}
}
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not have soft deletes enabled");
}
@ -261,6 +288,12 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
}
private static String[] extractIndexShardHistoryUUIDs(IndexMetaData followIndexMetadata) {
String historyUUIDs = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)
.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS);
return historyUUIDs.split(",");
}
private static final Set<Setting<?>> WHITE_LISTED_SETTINGS;
static {

View File

@ -27,7 +27,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -116,7 +118,8 @@ public class ShardChangesIT extends ESIntegTestCase {
long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(2L));
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId());
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
request.setFromSeqNo(0L);
request.setMaxOperationCount(3);
ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get();
@ -141,7 +144,7 @@ public class ShardChangesIT extends ESIntegTestCase {
globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(5L));
request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId());
request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
request.setFromSeqNo(3L);
request.setMaxOperationCount(3);
response = client().execute(ShardChangesAction.INSTANCE, request).get();
@ -357,16 +360,11 @@ public class ShardChangesIT extends ESIntegTestCase {
final String leaderIndexSettings =
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
final String followerIndexSettings =
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
internalCluster().ensureAtLeastNumDataNodes(2);
ensureGreen("index1", "index2");
ensureGreen("index1");
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();
final int numDocs = randomIntBetween(2, 64);
for (int i = 0; i < numDocs; i++) {
@ -409,13 +407,13 @@ public class ShardChangesIT extends ESIntegTestCase {
assertAcked(client().admin().indices().prepareCreate("test-follower").get());
// Leader index does not exist.
FollowIndexAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet());
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet());
// Follower index does not exist.
FollowIndexAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet());
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet());
// Both indices do not exist.
FollowIndexAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet());
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet());
}
@TestLogging("_root:DEBUG")

View File

@ -59,22 +59,27 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
int max = randomIntBetween(min, numWrites - 1);
int size = max - min + 1;
final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard,
indexShard.getGlobalCheckpoint(), min, size, Long.MAX_VALUE);
indexShard.getGlobalCheckpoint(), min, size, indexShard.getHistoryUUID(), Long.MAX_VALUE);
final List<Long> seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList());
final List<Long> expectedSeqNos = LongStream.rangeClosed(min, max).boxed().collect(Collectors.toList());
assertThat(seenSeqNos, equalTo(expectedSeqNos));
}
// get operations for a range no operations exists:
Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(),
numWrites, numWrites + 1, Long.MAX_VALUE);
numWrites, numWrites + 1, indexShard.getHistoryUUID(), Long.MAX_VALUE);
assertThat(operations.length, equalTo(0));
// get operations for a range some operations do not exist:
operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(),
numWrites - 10, numWrites + 10, Long.MAX_VALUE);
numWrites - 10, numWrites + 10, indexShard.getHistoryUUID(), Long.MAX_VALUE);
assertThat(operations.length, equalTo(10));
// Unexpected history UUID:
Exception e = expectThrows(IllegalStateException.class, () -> ShardChangesAction.getOperations(indexShard,
indexShard.getGlobalCheckpoint(), 0, 10, "different-history-uuid", Long.MAX_VALUE));
assertThat(e.getMessage(), equalTo("unexpected history uuid, expected [different-history-uuid], actual [" +
indexShard.getHistoryUUID() + "]"));
}
public void testGetOperationsWhenShardNotStarted() throws Exception {
@ -83,7 +88,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING);
Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting);
expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperations(indexShard,
indexShard.getGlobalCheckpoint(), 0, 1, Long.MAX_VALUE));
indexShard.getGlobalCheckpoint(), 0, 1, indexShard.getHistoryUUID(), Long.MAX_VALUE));
}
public void testGetOperationsExceedByteLimit() throws Exception {
@ -100,7 +105,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
final IndexShard indexShard = indexService.getShard(0);
final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(),
0, 12, 256);
0, 12, indexShard.getHistoryUUID(), 256);
assertThat(operations.length, equalTo(12));
assertThat(operations[0].seqNo(), equalTo(0L));
assertThat(operations[1].seqNo(), equalTo(1L));
@ -127,7 +132,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
final IndexShard indexShard = indexService.getShard(0);
final Translog.Operation[] operations =
ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 1, 0);
ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 1, indexShard.getHistoryUUID(), 0);
assertThat(operations.length, equalTo(1));
assertThat(operations[0].seqNo(), equalTo(0L));
}
@ -137,7 +142,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
final AtomicReference<Exception> reference = new AtomicReference<>();
final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class);
transportAction.execute(
new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0)),
new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0), "uuid"),
new ActionListener<ShardChangesAction.Response>() {
@Override
public void onResponse(final ShardChangesAction.Response response) {
@ -162,7 +167,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
final AtomicReference<Exception> reference = new AtomicReference<>();
final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class);
transportAction.execute(
new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards)),
new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards), "uuid"),
new ActionListener<ShardChangesAction.Response>() {
@Override
public void onResponse(final ShardChangesAction.Response response) {

View File

@ -15,7 +15,8 @@ public class ShardChangesRequestTests extends AbstractStreamableTestCase<ShardCh
@Override
protected ShardChangesAction.Request createTestInstance() {
ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0));
ShardChangesAction.Request request =
new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0), randomAlphaOfLength(4));
request.setMaxOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
request.setFromSeqNo(randomNonNegativeLong());
return request;
@ -27,7 +28,7 @@ public class ShardChangesRequestTests extends AbstractStreamableTestCase<ShardCh
}
public void testValidate() {
ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0));
ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0), "uuid");
request.setFromSeqNo(-1);
assertThat(request.validate().getMessage(), containsString("fromSeqNo [-1] cannot be lower than 0"));

View File

@ -20,7 +20,12 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
for (int i = 0; i < numOps; i++) {
operations[i] = new Translog.NoOp(i, 0, "test");
}
return new ShardChangesAction.Response(mappingVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations);
return new ShardChangesAction.Response(
mappingVersion,
leaderGlobalCheckpoint,
leaderMaxSeqNo,
operations
);
}
@Override

View File

@ -75,10 +75,20 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testRun) {
AtomicBoolean stopped = new AtomicBoolean(false);
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0), testRun.maxOperationCount, concurrency,
FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, concurrency, 10240,
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
ShardFollowTask params = new ShardFollowTask(
null,
new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0),
testRun.maxOperationCount,
concurrency,
FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
concurrency,
10240,
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
"uuid",
Collections.emptyMap()
);
ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName());
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> {
@ -215,8 +225,16 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
}
item.add(new TestResponse(null, mappingVersion,
new ShardChangesAction.Response(mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY))));
item.add(new TestResponse(
null,
mappingVersion,
new ShardChangesAction.Response(
mappingVersion,
nextGlobalCheckPoint,
nextGlobalCheckPoint,
ops.toArray(EMPTY))
)
);
responses.put(prevGlobalCheckpoint, item);
} else {
// Simulates a leader shard copy not having all the operations the shard follow task thinks it has by
@ -232,8 +250,12 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
}
// Sometimes add an empty shard changes response to also simulate a leader shard lagging behind
if (sometimes()) {
ShardChangesAction.Response response =
new ShardChangesAction.Response(mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY);
ShardChangesAction.Response response = new ShardChangesAction.Response(
mappingVersion,
prevGlobalCheckpoint,
prevGlobalCheckpoint,
EMPTY
);
item.add(new TestResponse(null, mappingVersion, response));
}
List<Translog.Operation> ops = new ArrayList<>();
@ -244,8 +266,12 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
}
// Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind:
long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo;
ShardChangesAction.Response response =
new ShardChangesAction.Response(mappingVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY));
ShardChangesAction.Response response = new ShardChangesAction.Response(
mappingVersion,
localLeaderGCP,
localLeaderGCP,
ops.toArray(EMPTY)
);
item.add(new TestResponse(null, mappingVersion, response));
responses.put(fromSeqNo, Collections.unmodifiableList(item));
}

View File

@ -627,9 +627,20 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
int bufferWriteLimit,
long maxBatchSizeInBytes) {
AtomicBoolean stopped = new AtomicBoolean(false);
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes,
maxConcurrentWriteBatches, bufferWriteLimit, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap());
ShardFollowTask params = new ShardFollowTask(
null,
new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0),
maxBatchOperationCount,
maxConcurrentReadBatches,
maxBatchSizeInBytes,
maxConcurrentWriteBatches,
bufferWriteLimit,
TimeValue.ZERO,
TimeValue.ZERO,
"uuid",
Collections.emptyMap()
);
shardChangesRequests = new ArrayList<>();
bulkShardOperationRequests = new ArrayList<>();
@ -690,12 +701,12 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
for (int i = 0; i < requestBatchSize; i++) {
operations[i] = new Translog.NoOp(from + i, 0, "test");
}
final ShardChangesAction.Response response =
new ShardChangesAction.Response(
mappingVersions.poll(),
leaderGlobalCheckpoints.poll(),
maxSeqNos.poll(),
operations);
final ShardChangesAction.Response response = new ShardChangesAction.Response(
mappingVersions.poll(),
leaderGlobalCheckpoints.poll(),
maxSeqNos.poll(),
operations
);
handler.accept(response);
}
}
@ -727,7 +738,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
}
return new ShardChangesAction.Response(
mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0]));
mappingVersion,
leaderGlobalCheckPoint,
leaderGlobalCheckPoint,
ops.toArray(new Translog.Operation[0])
);
}
void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {

View File

@ -38,11 +38,13 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {
@ -129,6 +131,43 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
}
public void testChangeHistoryUUID() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(0);
ReplicationGroup followerGroup = createFollowGroup(0)) {
leaderGroup.startAll();
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(
leaderSeqNoStats.getGlobalCheckpoint(),
leaderSeqNoStats.getMaxSeqNo(),
followerSeqNoStats.getGlobalCheckpoint(),
followerSeqNoStats.getMaxSeqNo());
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
leaderGroup.reinitPrimaryShard();
leaderGroup.getPrimary().store().bootstrapNewHistory();
recoverShardFromStore(leaderGroup.getPrimary());
String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "]"));
});
}
}
@Override
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
@ -159,12 +198,23 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240,
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
ShardFollowTask params = new ShardFollowTask(
null,
new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0),
between(1, 64),
between(1, 8),
Long.MAX_VALUE,
between(1, 4), 10240,
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
leaderGroup.getPrimary().getHistoryUUID(),
Collections.emptyMap()
);
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
AtomicBoolean stopped = new AtomicBoolean(false);
AtomicReference<Exception> failureHolder = new AtomicReference<>();
LongSet fetchOperations = new LongHashSet();
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
@ -210,10 +260,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
try {
final SeqNoStats seqNoStats = indexShard.seqNoStats();
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
maxOperationCount, params.getMaxBatchSizeInBytes());
maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSizeInBytes());
// hard code mapping version; this is ok, as mapping updates are not tested here
final ShardChangesAction.Response response =
new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops);
final ShardChangesAction.Response response = new ShardChangesAction.Response(
1L,
seqNoStats.getGlobalCheckpoint(),
seqNoStats.getMaxSeqNo(),
ops
);
handler.accept(response);
return;
} catch (Exception e) {
@ -238,9 +292,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
@Override
public void markAsFailed(Exception e) {
failureHolder.set(e);
stopped.set(true);
}
@Override
public Exception getFailure() {
return failureHolder.get();
}
};
}

View File

@ -34,7 +34,9 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollo
randomIntBetween(1, Integer.MAX_VALUE),
TimeValue.parseTimeValue(randomTimeValue(), ""),
TimeValue.parseTimeValue(randomTimeValue(), ""),
randomBoolean() ? null : Collections.singletonMap("key", "value"));
randomAlphaOfLength(4),
randomBoolean() ? null : Collections.singletonMap("key", "value")
);
}
@Override

View File

@ -14,64 +14,84 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.ShardChangesIT;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import java.io.IOException;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction.validate;
import static org.hamcrest.Matchers.equalTo;
public class TransportFollowIndexActionTests extends ESTestCase {
private static final Map<String, String> CUSTOM_METADATA =
singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "uuid");
public void testValidation() throws IOException {
FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2");
String[] UUIDs = new String[]{"uuid"};
{
// should fail, because leader index does not exist
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, null, null, null));
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, null, null, null, null));
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
}
{
// should fail, because follow index does not exist
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, null, null));
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap());
Exception e = expectThrows(IllegalArgumentException.class,
() -> validate(request, leaderIMD, null, null, null));
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
}
{
// should fail because the recorded leader index history uuid is not equal to the leader actual index history uuid:
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap());
Map<String, String> customMetaData =
singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "another-uuid");
IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData);
Exception e = expectThrows(IllegalArgumentException.class,
() -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("leader shard [index2][0] should reference [another-uuid] as history uuid but " +
"instead reference [uuid] as history uuid"));
}
{
// should fail because leader index does not have soft deletes enabled
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY);
IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null));
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap());
IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, CUSTOM_METADATA);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
}
{
// should fail because the number of primary shards between leader and follow index are not equal
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build());
IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null));
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap());
IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY, CUSTOM_METADATA);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(),
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
}
{
// should fail, because leader index is closed
IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build());
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap());
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build());
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null));
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), CUSTOM_METADATA);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("leader and follow index must be open"));
}
{
// should fail, because leader has a field with the same name mapped as keyword and follower as text
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build());
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap());
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5,
Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build());
Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
mapperService.updateMapping(null, followIMD);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, mapperService));
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService));
assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]"));
}
{
@ -80,38 +100,38 @@ public class TransportFollowIndexActionTests extends ESTestCase {
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build());
.put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build(), emptyMap());
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build());
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null));
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical"));
}
{
// should fail because the following index does not have the following_index settings
IndexMetaData leaderIMD = createIMD("index1", 5,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build());
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap());
Settings followingIndexSettings = randomBoolean() ? Settings.EMPTY :
Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build();
IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings);
IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings, CUSTOM_METADATA);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
followingIndexSettings, "index2");
mapperService.updateMapping(null, followIMD);
IllegalArgumentException error =
expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, mapperService));
expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService));
assertThat(error.getMessage(), equalTo("the following index [index2] is not ready to follow; " +
"the setting [index.xpack.ccr.following_index] must be enabled."));
}
{
// should succeed
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build());
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap());
IndexMetaData followIMD = createIMD("index2", 5, Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build());
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
mapperService.updateMapping(null, followIMD);
validate(request, leaderIMD, followIMD, mapperService);
validate(request, leaderIMD, followIMD, UUIDs, mapperService);
}
{
// should succeed, index settings are identical
@ -119,15 +139,15 @@ public class TransportFollowIndexActionTests extends ESTestCase {
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build());
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap());
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build());
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
followIMD.getSettings(), "index2");
mapperService.updateMapping(null, followIMD);
validate(request, leaderIMD, followIMD, mapperService);
validate(request, leaderIMD, followIMD, UUIDs, mapperService);
}
{
// should succeed despite whitelisted settings being different
@ -136,25 +156,32 @@ public class TransportFollowIndexActionTests extends ESTestCase {
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build());
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap());
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s")
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build());
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
followIMD.getSettings(), "index2");
mapperService.updateMapping(null, followIMD);
validate(request, leaderIMD, followIMD, mapperService);
validate(request, leaderIMD, followIMD, UUIDs, mapperService);
}
}
private static IndexMetaData createIMD(String index, int numberOfShards, Settings settings) throws IOException {
return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings);
private static IndexMetaData createIMD(String index,
int numberOfShards,
Settings settings,
Map<String, String> custom) throws IOException {
return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings, custom);
}
private static IndexMetaData createIMD(String index, State state, String mapping, int numberOfShards,
Settings settings) throws IOException {
private static IndexMetaData createIMD(String index,
State state,
String mapping,
int numberOfShards,
Settings settings,
Map<String, String> custom) throws IOException {
return IndexMetaData.builder(index)
.settings(settings(Version.CURRENT).put(settings))
.numberOfShards(numberOfShards)
@ -162,6 +189,7 @@ public class TransportFollowIndexActionTests extends ESTestCase {
.numberOfReplicas(0)
.setRoutingNumShards(numberOfShards)
.putMapping("_doc", mapping)
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, custom)
.build();
}