diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle new file mode 100644 index 00000000000..2f6f152c50e --- /dev/null +++ b/x-pack/plugin/ccr/build.gradle @@ -0,0 +1,52 @@ +import com.carrotsearch.gradle.junit4.RandomizedTestingTask +import org.elasticsearch.gradle.BuildPlugin + +evaluationDependsOn(':x-pack-elasticsearch:plugin:core') + +apply plugin: 'elasticsearch.esplugin' +esplugin { + name 'x-pack-ccr' + description 'Elasticsearch Expanded Pack Plugin - CCR' + classname 'org.elasticsearch.xpack.ccr.Ccr' + hasNativeController false + requiresKeystore true + extendedPlugins = ['x-pack-core'] + licenseFile project(':x-pack-elasticsearch').file('LICENSE.txt') + noticeFile project(':x-pack-elasticsearch').file('NOTICE.txt') +} +archivesBaseName = 'x-pack-ccr' + +integTest.enabled = false + +// Instead we create a separate task to run the +// tests based on ESIntegTestCase +task internalClusterTest(type: RandomizedTestingTask, + group: JavaBasePlugin.VERIFICATION_GROUP, + description: 'Java fantasy integration tests', + dependsOn: test.dependsOn) { + configure(BuildPlugin.commonTestConfig(project)) + classpath = project.test.classpath + testClassesDir = project.test.testClassesDir + include '**/*IT.class' + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} + +check { + dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest'] +} + +internalClusterTest.mustRunAfter test + +dependencies { + compileOnly "org.elasticsearch:elasticsearch:${version}" + compileOnly "org.elasticsearch.plugin:x-pack-core:${version}" + testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') +} + +dependencyLicenses { + ignoreSha 'x-pack-core' +} + +run { + plugin ':x-pack-elasticsearch:plugin:core' +} diff --git a/x-pack/plugin/ccr/qa/build.gradle b/x-pack/plugin/ccr/qa/build.gradle new file mode 100644 index 00000000000..4a007422f38 --- /dev/null +++ b/x-pack/plugin/ccr/qa/build.gradle @@ -0,0 +1,12 @@ + +/* Remove assemble on all qa projects because we don't need to publish + * artifacts for them. */ +gradle.projectsEvaluated { + subprojects { + Task assemble = project.tasks.findByName('assemble') + if (assemble) { + project.tasks.remove(assemble) + project.build.dependsOn.remove('assemble') + } + } +} diff --git a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle new file mode 100644 index 00000000000..444575a1538 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle @@ -0,0 +1,40 @@ +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'runtime') + testCompile project(path: xpackModule('ccr'), configuration: 'runtime') +} + +task leaderClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +leaderClusterTestCluster { + numNodes = 1 + clusterName = 'leader-cluster' + plugin xpackProject('plugin').path +} + +leaderClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'true' +} + +task followClusterTest(type: RestIntegTestTask) {} + +followClusterTestCluster { + dependsOn leaderClusterTestRunner + numNodes = 1 + clusterName = 'follow-cluster' + plugin xpackProject('plugin').path + setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" +} + +followClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'false' + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + finalizedBy 'leaderClusterTestCluster#stop' +} + +test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java new file mode 100644 index 00000000000..09556bf7487 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.apache.http.HttpHost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; + +public class FollowIndexIT extends ESRestTestCase { + + private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster")); + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + public void testFollowIndex() throws Exception { + final int numDocs = 128; + final String leaderIndexName = "test_index1"; + if (runningAgainstLeaderCluster) { + logger.info("Running against leader cluster"); + for (int i = 0; i < numDocs; i++) { + logger.info("Indexing doc [{}]", i); + index(client(), leaderIndexName, Integer.toString(i), "field", i); + } + refresh(leaderIndexName); + verifyDocuments(leaderIndexName, numDocs); + } else { + logger.info("Running against follow cluster"); + final String followIndexName = "test_index2"; + Settings indexSettings = Settings.builder() + .put("index.xpack.ccr.following_index", true) + .build(); + // TODO: remove mapping here when ccr syncs mappings too + createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}"); + ensureYellow(followIndexName); + + followIndex("leader_cluster:" + leaderIndexName, followIndexName); + assertBusy(() -> verifyDocuments(followIndexName, numDocs)); + + try (RestClient leaderClient = buildLeaderClient()) { + int id = numDocs; + index(leaderClient, leaderIndexName, Integer.toString(id), "field", id); + index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1); + index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2); + } + + assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); + } + } + + private static void index(RestClient client, String index, String id, Object... fields) throws IOException { + XContentBuilder document = jsonBuilder().startObject(); + for (int i = 0; i < fields.length; i += 2) { + document.field((String) fields[i], fields[i + 1]); + } + document.endObject(); + assertOK(client.performRequest("POST", "/" + index + "/doc/" + id, emptyMap(), + new StringEntity(Strings.toString(document), ContentType.APPLICATION_JSON))); + } + + private static void refresh(String index) throws IOException { + assertOK(client().performRequest("POST", "/" + index + "/_refresh")); + } + + private static void followIndex(String leaderIndex, String followIndex) throws IOException { + Map params = Collections.singletonMap("leader_index", leaderIndex); + assertOK(client().performRequest("POST", "/_xpack/ccr/" + followIndex + "/_follow", params)); + } + + private static void verifyDocuments(String index, int expectedNumDocs) throws IOException { + Map params = new HashMap<>(); + params.put("size", Integer.toString(expectedNumDocs)); + params.put("sort", "field:asc"); + Map response = toMap(client().performRequest("GET", "/" + index + "/_search", params)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(numDocs, equalTo(expectedNumDocs)); + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), equalTo(expectedNumDocs)); + for (int i = 0; i < expectedNumDocs; i++) { + int value = (int) XContentMapValues.extractValue("_source.field", (Map) hits.get(i)); + assertThat(i, equalTo(value)); + } + } + + private static Map toMap(Response response) throws IOException { + return toMap(EntityUtils.toString(response.getEntity())); + } + + private static Map toMap(String response) { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); + } + + private static void ensureYellow(String index) throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "yellow"); + params.put("wait_for_no_relocating_shards", "true"); + params.put("timeout", "30s"); + params.put("level", "shards"); + assertOK(client().performRequest("GET", "_cluster/health/" + index, params)); + } + + private RestClient buildLeaderClient() throws IOException { + assert runningAgainstLeaderCluster == false; + String leaderUrl = System.getProperty("tests.leader_host"); + int portSeparator = leaderUrl.lastIndexOf(':'); + HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator), + Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol()); + return buildClient(Settings.EMPTY, new HttpHost[]{httpHost}); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java new file mode 100644 index 00000000000..c5dd6f861fc --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; +import org.elasticsearch.xpack.ccr.action.ShardChangesAction; +import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; +import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; +import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; +import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; +import org.elasticsearch.xpack.core.XPackPlugin; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING; +import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING; + +/** + * Container class for CCR functionality. + */ +public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { + + public static final String CCR_THREAD_POOL_NAME = "ccr"; + + private final boolean enabled; + private final Settings settings; + + /** + * Construct an instance of the CCR container with the specified settings. + * + * @param settings the settings + */ + public Ccr(final Settings settings) { + this.settings = settings; + this.enabled = CCR_ENABLED_SETTING.get(settings); + } + + @Override + public List> getPersistentTasksExecutor(ClusterService clusterService, + ThreadPool threadPool, Client client) { + return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool)); + } + + public List> getActions() { + if (enabled == false) { + return emptyList(); + } + + return Arrays.asList( + new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), + new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class), + new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class), + new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class) + ); + } + + public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster) { + return Arrays.asList( + new RestUnfollowIndexAction(settings, restController), + new RestFollowExistingIndexAction(settings, restController) + ); + } + + public List getNamedWriteables() { + return Arrays.asList( + // Persistent action requests + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ShardFollowTask.NAME, + ShardFollowTask::new), + + // Task statuses + new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME, + ShardFollowNodeTask.Status::new) + ); + } + + public List getNamedXContent() { + return Arrays.asList( + // Persistent action requests + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ShardFollowTask.NAME), + ShardFollowTask::fromXContent), + + // Task statuses + new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME), + ShardFollowNodeTask.Status::fromXContent) + ); + } + + /** + * The settings defined by CCR. + * + * @return the settings + */ + public List> getSettings() { + return CcrSettings.getSettings(); + } + + /** + * The optional engine factory for CCR. This method inspects the index settings for the {@link CcrSettings#CCR_FOLLOWING_INDEX_SETTING} + * setting to determine whether or not the engine implementation should be a following engine. + * + * @return the optional engine factory + */ + public Optional getEngineFactory(final IndexSettings indexSettings) { + if (CCR_FOLLOWING_INDEX_SETTING.get(indexSettings.getSettings())) { + return Optional.of(new FollowingEngineFactory()); + } else { + return Optional.empty(); + } + } + + public List> getExecutorBuilders(Settings settings) { + if (enabled == false) { + return Collections.emptyList(); + } + + FixedExecutorBuilder ccrTp = new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, + 32, 100, "xpack.ccr.ccr_thread_pool"); + + return Collections.singletonList(ccrTp); + } + + protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java new file mode 100644 index 00000000000..f6859de7756 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Container class for CCR settings. + */ +public final class CcrSettings { + + // prevent construction + private CcrSettings() { + + } + + /** + * Setting for controlling whether or not CCR is enabled. + */ + static final Setting CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope); + + /** + * Index setting for a following index. + */ + public static final Setting CCR_FOLLOWING_INDEX_SETTING = + Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope); + + /** + * The settings defined by CCR. + * + * @return the settings + */ + static List> getSettings() { + return Arrays.asList( + CCR_ENABLED_SETTING, + CCR_FOLLOWING_INDEX_SETTING); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java new file mode 100644 index 00000000000..44628f1da5b --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java @@ -0,0 +1,277 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +public class FollowExistingIndexAction extends Action { + + public static final FollowExistingIndexAction INSTANCE = new FollowExistingIndexAction(); + public static final String NAME = "cluster:admin/xpack/ccr/follow_existing_index"; + + private FollowExistingIndexAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest { + + private String leaderIndex; + private String followIndex; + private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE; + private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS; + private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; + + public String getLeaderIndex() { + return leaderIndex; + } + + public void setLeaderIndex(String leaderIndex) { + this.leaderIndex = leaderIndex; + } + + public String getFollowIndex() { + return followIndex; + } + + public void setFollowIndex(String followIndex) { + this.followIndex = followIndex; + } + + public long getBatchSize() { + return batchSize; + } + + public void setBatchSize(long batchSize) { + if (batchSize < 1) { + throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]"); + } + + this.batchSize = batchSize; + } + + public void setConcurrentProcessors(int concurrentProcessors) { + if (concurrentProcessors < 1) { + throw new IllegalArgumentException("concurrent_processors must be larger than 0"); + } + this.concurrentProcessors = concurrentProcessors; + } + + public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) { + if (processorMaxTranslogBytes <= 0) { + throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0"); + } + this.processorMaxTranslogBytes = processorMaxTranslogBytes; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + leaderIndex = in.readString(); + followIndex = in.readString(); + batchSize = in.readVLong(); + processorMaxTranslogBytes = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(leaderIndex); + out.writeString(followIndex); + out.writeVLong(batchSize); + out.writeVLong(processorMaxTranslogBytes); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + RequestBuilder(ElasticsearchClient client, Action action) { + super(client, action, new Request()); + } + } + + public static class Response extends ActionResponse { + + } + + public static class TransportAction extends HandledTransportAction { + + private final Client client; + private final ClusterService clusterService; + private final RemoteClusterService remoteClusterService; + private final PersistentTasksService persistentTasksService; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService, + PersistentTasksService persistentTasksService) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + this.client = client; + this.clusterService = clusterService; + this.remoteClusterService = transportService.getRemoteClusterService(); + this.persistentTasksService = persistentTasksService; + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + ClusterState localClusterState = clusterService.state(); + IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex); + + String[] indices = new String[]{request.getLeaderIndex()}; + Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); + if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + // Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData: + IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex); + start(request, null, leaderIndexMetadata, followIndexMetadata, listener); + } else { + // Following an index in remote cluster, so use remote client to fetch leader IndexMetaData: + assert remoteClusterIndices.size() == 1; + Map.Entry> entry = remoteClusterIndices.entrySet().iterator().next(); + assert entry.getValue().size() == 1; + String clusterNameAlias = entry.getKey(); + String leaderIndex = entry.getValue().get(0); + + Client remoteClient = client.getRemoteClusterClient(clusterNameAlias); + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex); + remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> { + ClusterState remoteClusterState = r.getState(); + IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); + start(request, clusterNameAlias, leaderIndexMetadata, followIndexMetadata, listener); + }, listener::onFailure)); + } + } + + /** + * Performs validation on the provided leader and follow {@link IndexMetaData} instances and then + * creates a persistent task for each leader primary shard. This persistent tasks track changes in the leader + * shard and replicate these changes to a follower shard. + * + * Currently the following validation is performed: + *
    + *
  • The leader index and follow index need to have the same number of primary shards
  • + *
+ */ + void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, + ActionListener handler) { + if (leaderIndexMetadata == null) { + handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist")); + return; + } + + if (followIndexMetadata == null) { + handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist")); + return; + } + + if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) { + handler.onFailure(new IllegalArgumentException("leader index primary shards [" + + leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " + + "shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]")); + // TODO: other validation checks + } else { + final int numShards = followIndexMetadata.getNumberOfShards(); + final AtomicInteger counter = new AtomicInteger(numShards); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + for (int i = 0; i < numShards; i++) { + final int shardId = i; + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes); + persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask, + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + responses.set(shardId, task); + finalizeResponse(); + } + + @Override + public void onFailure(Exception e) { + responses.set(shardId, e); + finalizeResponse(); + } + + void finalizeResponse() { + Exception error = null; + if (counter.decrementAndGet() == 0) { + for (int j = 0; j < responses.length(); j++) { + Object response = responses.get(j); + if (response instanceof Exception) { + if (error == null) { + error = (Exception) response; + } else { + error.addSuppressed((Throwable) response); + } + } + } + + if (error == null) { + // include task ids? + handler.onResponse(new Response()); + } else { + // TODO: cancel all started tasks + handler.onFailure(error); + } + } + } + } + ); + } + } + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java new file mode 100644 index 00000000000..d5c774f53a0 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -0,0 +1,297 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.SingleShardOperationRequestBuilder; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Queue; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class ShardChangesAction extends Action { + + public static final ShardChangesAction INSTANCE = new ShardChangesAction(); + public static final String NAME = "cluster:admin/xpack/ccr/shard_changes"; + + private ShardChangesAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends SingleShardRequest { + + private long minSeqNo; + private long maxSeqNo; + private ShardId shardId; + private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; + + public Request(ShardId shardId) { + super(shardId.getIndexName()); + this.shardId = shardId; + } + + Request() { + } + + public ShardId getShard() { + return shardId; + } + + public long getMinSeqNo() { + return minSeqNo; + } + + public void setMinSeqNo(long minSeqNo) { + this.minSeqNo = minSeqNo; + } + + public long getMaxSeqNo() { + return maxSeqNo; + } + + public void setMaxSeqNo(long maxSeqNo) { + this.maxSeqNo = maxSeqNo; + } + + public long getMaxTranslogsBytes() { + return maxTranslogsBytes; + } + + public void setMaxTranslogsBytes(long maxTranslogsBytes) { + this.maxTranslogsBytes = maxTranslogsBytes; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (minSeqNo < 0) { + validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException); + } + if (maxSeqNo < minSeqNo) { + validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo [" + + maxSeqNo + "]", validationException); + } + if (maxTranslogsBytes <= 0) { + validationException = addValidationError("maxTranslogsBytes [" + maxTranslogsBytes + "] must be larger than 0", + validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + minSeqNo = in.readVLong(); + maxSeqNo = in.readVLong(); + shardId = ShardId.readShardId(in); + maxTranslogsBytes = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(minSeqNo); + out.writeVLong(maxSeqNo); + shardId.writeTo(out); + out.writeVLong(maxTranslogsBytes); + } + + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final Request request = (Request) o; + return minSeqNo == request.minSeqNo && + maxSeqNo == request.maxSeqNo && + Objects.equals(shardId, request.shardId) && + maxTranslogsBytes == request.maxTranslogsBytes; + } + + @Override + public int hashCode() { + return Objects.hash(minSeqNo, maxSeqNo, shardId, maxTranslogsBytes); + } + } + + public static final class Response extends ActionResponse { + + private Translog.Operation[] operations; + + Response() { + } + + Response(final Translog.Operation[] operations) { + this.operations = operations; + } + + public Translog.Operation[] getOperations() { + return operations; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeArray(Translog.Operation::writeOperation, operations); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final Response response = (Response) o; + return Arrays.equals(operations, response.operations); + } + + @Override + public int hashCode() { + return Arrays.hashCode(operations); + } + } + + static class RequestBuilder extends SingleShardOperationRequestBuilder { + + RequestBuilder(ElasticsearchClient client, Action action) { + super(client, action, new Request()); + } + } + + public static class TransportAction extends TransportSingleShardAction { + + private final IndicesService indicesService; + + @Inject + public TransportAction(Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + IndicesService indicesService) { + super(settings, NAME, threadPool, clusterService, transportService, actionFilters, + indexNameExpressionResolver, Request::new, ThreadPool.Names.GET); + this.indicesService = indicesService; + } + + @Override + protected Response shardOperation(Request request, ShardId shardId) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); + IndexShard indexShard = indexService.getShard(request.getShard().id()); + + return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes); + } + + @Override + protected boolean resolveIndex(Request request) { + return true; + } + + @Override + protected ShardsIterator shards(ClusterState state, InternalRequest request) { + return state.routingTable() + .index(request.concreteIndex()) + .shard(request.request().getShard().id()) + .activeInitializingShardsRandomIt(); + } + + @Override + protected Response newResponse() { + return new Response(); + } + + } + + private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; + + static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException { + if (indexShard.state() != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); + } + + long seenBytes = 0; + long nextExpectedSeqNo = minSeqNo; + final Queue orderedOps = new PriorityQueue<>(Comparator.comparingLong(Translog.Operation::seqNo)); + + final List operations = new ArrayList<>(); + try (Translog.Snapshot snapshot = indexShard.newTranslogSnapshotBetween(minSeqNo, maxSeqNo)) { + for (Translog.Operation unorderedOp = snapshot.next(); unorderedOp != null; unorderedOp = snapshot.next()) { + if (unorderedOp.seqNo() < minSeqNo || unorderedOp.seqNo() > maxSeqNo) { + continue; + } + + orderedOps.add(unorderedOp); + while (orderedOps.peek() != null && orderedOps.peek().seqNo() == nextExpectedSeqNo) { + Translog.Operation orderedOp = orderedOps.poll(); + if (seenBytes < byteLimit) { + nextExpectedSeqNo++; + seenBytes += orderedOp.estimateSize(); + operations.add(orderedOp); + if (nextExpectedSeqNo > maxSeqNo) { + return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY)); + } + } else { + return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY)); + } + } + } + } + + if (nextExpectedSeqNo >= maxSeqNo) { + return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY)); + } else { + String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo + + "] found, tracker checkpoint [" + nextExpectedSeqNo + "]"; + throw new IllegalStateException(message); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java new file mode 100644 index 00000000000..9d1fe5d67df --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +public class ShardFollowNodeTask extends AllocatedPersistentTask { + + private final AtomicLong processedGlobalCheckpoint = new AtomicLong(); + + ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + super(id, type, action, description, parentTask, headers); + } + + void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { + this.processedGlobalCheckpoint.set(processedGlobalCheckpoint); + } + + @Override + public Task.Status getStatus() { + return new Status(processedGlobalCheckpoint.get()); + } + + public static class Status implements Task.Status { + + public static final String NAME = "shard-follow-node-task-status"; + + static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint"); + + static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(NAME, args -> new Status((Long) args[0])); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSED_GLOBAL_CHECKPOINT_FIELD); + } + + private final long processedGlobalCheckpoint; + + Status(long processedGlobalCheckpoint) { + this.processedGlobalCheckpoint = processedGlobalCheckpoint; + } + + public Status(StreamInput in) throws IOException { + this.processedGlobalCheckpoint = in.readVLong(); + } + + public long getProcessedGlobalCheckpoint() { + return processedGlobalCheckpoint; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(processedGlobalCheckpoint); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint); + } + builder.endObject(); + return builder; + } + + public static Status fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return processedGlobalCheckpoint == status.processedGlobalCheckpoint; + } + + @Override + public int hashCode() { + return Objects.hash(processedGlobalCheckpoint); + } + + public String toString() { + return Strings.toString(this); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java new file mode 100644 index 00000000000..09202c925ba --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.persistent.PersistentTaskParams; + +import java.io.IOException; +import java.util.Objects; + +public class ShardFollowTask implements PersistentTaskParams { + + public static final String NAME = "shard_follow"; + + static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias"); + static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index"); + static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid"); + static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard"); + static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index"); + static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); + static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); + public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size"); + public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks"); + public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes"); + + public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), + new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9])); + + static { + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST); + } + + private final String leaderClusterAlias; + private final ShardId followShardId; + private final ShardId leaderShardId; + private final long maxChunkSize; + private final int numConcurrentChunks; + private final long processorMaxTranslogBytes; + + ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize, + int numConcurrentChunks, long processorMaxTranslogBytes) { + this.leaderClusterAlias = leaderClusterAlias; + this.followShardId = followShardId; + this.leaderShardId = leaderShardId; + this.maxChunkSize = maxChunkSize; + this.numConcurrentChunks = numConcurrentChunks; + this.processorMaxTranslogBytes = processorMaxTranslogBytes; + } + + public ShardFollowTask(StreamInput in) throws IOException { + this.leaderClusterAlias = in.readOptionalString(); + this.followShardId = ShardId.readShardId(in); + this.leaderShardId = ShardId.readShardId(in); + this.maxChunkSize = in.readVLong(); + this.numConcurrentChunks = in.readVInt(); + this.processorMaxTranslogBytes = in.readVLong(); + } + + public String getLeaderClusterAlias() { + return leaderClusterAlias; + } + + public ShardId getFollowShardId() { + return followShardId; + } + + public ShardId getLeaderShardId() { + return leaderShardId; + } + + public long getMaxChunkSize() { + return maxChunkSize; + } + + public int getNumConcurrentChunks() { + return numConcurrentChunks; + } + + public long getProcessorMaxTranslogBytes() { + return processorMaxTranslogBytes; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(leaderClusterAlias); + followShardId.writeTo(out); + leaderShardId.writeTo(out); + out.writeVLong(maxChunkSize); + out.writeVInt(numConcurrentChunks); + out.writeVLong(processorMaxTranslogBytes); + } + + public static ShardFollowTask fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (leaderClusterAlias != null) { + builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + } + builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName()); + builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID()); + builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id()); + builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); + builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); + builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); + builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize); + builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks); + builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShardFollowTask that = (ShardFollowTask) o; + return Objects.equals(leaderClusterAlias, that.leaderClusterAlias) && + Objects.equals(followShardId, that.followShardId) && + Objects.equals(leaderShardId, that.leaderShardId) && + maxChunkSize == that.maxChunkSize && + numConcurrentChunks == that.numConcurrentChunks && + processorMaxTranslogBytes == that.processorMaxTranslogBytes; + } + + @Override + public int hashCode() { + return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, + processorMaxTranslogBytes); + } + + public String toString() { + return Strings.toString(this); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java new file mode 100644 index 00000000000..9f62b014a18 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -0,0 +1,358 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.NetworkExceptionHelper; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksExecutor; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.LongConsumer; + +public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + + static final long DEFAULT_BATCH_SIZE = 1024; + static final int PROCESSOR_RETRY_LIMIT = 16; + static final int DEFAULT_CONCURRENT_PROCESSORS = 1; + static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE; + private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); + + private final Client client; + private final ThreadPool threadPool; + + public ShardFollowTasksExecutor(Settings settings, Client client, ThreadPool threadPool) { + super(settings, ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME); + this.client = client; + this.threadPool = threadPool; + } + + @Override + public void validate(ShardFollowTask params, ClusterState clusterState) { + if (params.getLeaderClusterAlias() == null) { + // We can only validate IndexRoutingTable in local cluster, + // for remote cluster we would need to make a remote call and we cannot do this here. + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getLeaderShardId().getIndex()); + if (routingTable.shard(params.getLeaderShardId().id()).primaryShard().started() == false) { + throw new IllegalArgumentException("Not all copies of leader shard are started"); + } + } + + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex()); + if (routingTable.shard(params.getFollowShardId().id()).primaryShard().started() == false) { + throw new IllegalArgumentException("Not all copies of follow shard are started"); + } + } + + @Override + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTasksCustomMetaData.PersistentTask taskInProgress, + Map headers) { + return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) { + ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; + Client leaderClient = params.getLeaderClusterAlias() != null ? + this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client; + logger.info("Starting shard following [{}]", params); + fetchGlobalCheckpoint(client, params.getFollowShardId(), + followGlobalCheckPoint -> prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed); + } + + void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { + if (task.getState() != AllocatedPersistentTask.State.STARTED) { + // TODO: need better cancellation control + return; + } + + final ShardId leaderShard = params.getLeaderShardId(); + final ShardId followerShard = params.getFollowShardId(); + fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> { + // TODO: check if both indices have the same history uuid + if (leaderGlobalCheckPoint == followGlobalCheckPoint) { + retry(leaderClient, task, params, followGlobalCheckPoint); + } else { + assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + + "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; + Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); + Consumer handler = e -> { + if (e == null) { + task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); + prepare(leaderClient, task, params, leaderGlobalCheckPoint); + } else { + task.markAsFailed(e); + } + }; + ChunksCoordinator coordinator = new ChunksCoordinator(client, leaderClient, ccrExecutor, params.getMaxChunkSize(), + params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler); + coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); + coordinator.start(); + } + }, task::markAsFailed); + } + + private void retry(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { + threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + task.markAsFailed(e); + } + + @Override + protected void doRun() throws Exception { + prepare(leaderClient, task, params, followGlobalCheckPoint); + } + }); + } + + private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { + client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { + IndexStats indexStats = r.getIndex(shardId.getIndexName()); + Optional filteredShardStats = Arrays.stream(indexStats.getShards()) + .filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId)) + .filter(shardStats -> shardStats.getShardRouting().primary()) + .findAny(); + + if (filteredShardStats.isPresent()) { + // Treat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1. + final long globalCheckPoint = Math.max(0, filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint()); + handler.accept(globalCheckPoint); + } else { + errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); + } + }, errorHandler)); + } + + static class ChunksCoordinator { + + private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class); + + private final Client followerClient; + private final Client leaderClient; + private final Executor ccrExecutor; + + private final long batchSize; + private final int concurrentProcessors; + private final long processorMaxTranslogBytes; + private final ShardId leaderShard; + private final ShardId followerShard; + private final Consumer handler; + + private final CountDown countDown; + private final Queue chunks = new ConcurrentLinkedQueue<>(); + private final AtomicReference failureHolder = new AtomicReference<>(); + + ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, long batchSize, int concurrentProcessors, + long processorMaxTranslogBytes, ShardId leaderShard, ShardId followerShard, Consumer handler) { + this.followerClient = followerClient; + this.leaderClient = leaderClient; + this.ccrExecutor = ccrExecutor; + this.batchSize = batchSize; + this.concurrentProcessors = concurrentProcessors; + this.processorMaxTranslogBytes = processorMaxTranslogBytes; + this.leaderShard = leaderShard; + this.followerShard = followerShard; + this.handler = handler; + this.countDown = new CountDown(concurrentProcessors); + } + + void createChucks(long from, long to) { + LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to); + for (long i = from; i < to; i += batchSize) { + long v2 = i + batchSize < to ? i + batchSize : to; + chunks.add(new long[]{i == from ? i : i + 1, v2}); + } + } + + void start() { + LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors", + leaderShard, chunks.size(), concurrentProcessors); + for (int i = 0; i < concurrentProcessors; i++) { + ccrExecutor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + assert e != null; + LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e); + postProcessChuck(e); + } + + @Override + protected void doRun() throws Exception { + processNextChunk(); + } + }); + } + } + + void processNextChunk() { + long[] chunk = chunks.poll(); + if (chunk == null) { + postProcessChuck(null); + return; + } + LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); + Consumer processorHandler = e -> { + if (e == null) { + LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); + processNextChunk(); + } else { + LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]", + leaderShard, chunk[0], chunk[1]), e); + postProcessChuck(e); + } + }; + ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, leaderShard, + followerShard, processorHandler); + processor.start(chunk[0], chunk[1], processorMaxTranslogBytes); + } + + void postProcessChuck(Exception e) { + if (failureHolder.compareAndSet(null, e) == false) { + Exception firstFailure = failureHolder.get(); + firstFailure.addSuppressed(e); + } + if (countDown.countDown()) { + handler.accept(failureHolder.get()); + } + } + + Queue getChunks() { + return chunks; + } + + } + + static class ChunkProcessor { + + private final Client leaderClient; + private final Client followerClient; + private final Queue chunks; + private final Executor ccrExecutor; + + private final ShardId leaderShard; + private final ShardId followerShard; + private final Consumer handler; + final AtomicInteger retryCounter = new AtomicInteger(0); + + ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, ShardId leaderShard, + ShardId followerShard, Consumer handler) { + this.leaderClient = leaderClient; + this.followerClient = followerClient; + this.chunks = chunks; + this.ccrExecutor = ccrExecutor; + this.leaderShard = leaderShard; + this.followerShard = followerShard; + this.handler = handler; + } + + void start(final long from, final long to, final long maxTranslogsBytes) { + ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard); + request.setMinSeqNo(from); + request.setMaxSeqNo(to); + request.setMaxTranslogsBytes(maxTranslogsBytes); + leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(ShardChangesAction.Response response) { + handleResponse(to, response); + } + + @Override + public void onFailure(Exception e) { + assert e != null; + if (shouldRetry(e)) { + if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { + start(from, to, maxTranslogsBytes); + } else { + handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + + "] times, aborting...", e)); + } + } else { + handler.accept(e); + } + } + }); + } + + void handleResponse(final long to, final ShardChangesAction.Response response) { + if (response.getOperations().length != 0) { + Translog.Operation lastOp = response.getOperations()[response.getOperations().length - 1]; + boolean maxByteLimitReached = lastOp.seqNo() < to; + if (maxByteLimitReached) { + // add a new entry to the queue for the operations that couldn't be fetched in the current shard changes api call: + chunks.add(new long[]{lastOp.seqNo() + 1, to}); + } + } + ccrExecutor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + assert e != null; + handler.accept(e); + } + + @Override + protected void doRun() throws Exception { + final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations()); + followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { + handler.accept(null); + } + + @Override + public void onFailure(final Exception e) { + // No retry mechanism here, because if a failure is being redirected to this place it is considered + // non recoverable. + assert e != null; + handler.accept(e); + } + }); + } + }); + } + + boolean shouldRetry(Exception e) { + // TODO: What other exceptions should be retried? + return NetworkExceptionHelper.isConnectException(e) || + NetworkExceptionHelper.isCloseConnectionException(e); + } + + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java new file mode 100644 index 00000000000..d212deefa61 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/UnfollowIndexAction.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksService; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +public class UnfollowIndexAction extends Action { + + public static final UnfollowIndexAction INSTANCE = new UnfollowIndexAction(); + public static final String NAME = "cluster:admin/xpack/ccr/unfollow_index"; + + private UnfollowIndexAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest { + + private String followIndex; + + public String getFollowIndex() { + return followIndex; + } + + public void setFollowIndex(String followIndex) { + this.followIndex = followIndex; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + followIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(followIndex); + } + } + + public static class Response extends ActionResponse { + + } + + public static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(ElasticsearchClient client) { + super(client, INSTANCE, new Request()); + } + } + + + public static class TransportAction extends HandledTransportAction { + + private final Client client; + private final PersistentTasksService persistentTasksService; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Client client, + PersistentTasksService persistentTasksService) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + this.client = client; + this.persistentTasksService = persistentTasksService; + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> { + IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.followIndex); + final int numShards = followIndexMetadata.getNumberOfShards(); + final AtomicInteger counter = new AtomicInteger(numShards); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + for (int i = 0; i < numShards; i++) { + final int shardId = i; + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + persistentTasksService.cancelPersistentTask(taskId, + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + responses.set(shardId, task); + finalizeResponse(); + } + + @Override + public void onFailure(Exception e) { + responses.set(shardId, e); + finalizeResponse(); + } + + void finalizeResponse() { + Exception error = null; + if (counter.decrementAndGet() == 0) { + for (int j = 0; j < responses.length(); j++) { + Object response = responses.get(j); + if (response instanceof Exception) { + if (error == null) { + error = (Exception) response; + } else { + error.addSuppressed((Throwable) response); + } + } + } + + if (error == null) { + // include task ids? + listener.onResponse(new Response()); + } else { + // TODO: cancel all started tasks + listener.onFailure(error); + } + } + } + }); + } + }, listener::onFailure)); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsAction.java new file mode 100644 index 00000000000..d14bb6345a4 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsAction.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class BulkShardOperationsAction + extends Action { + + public static final BulkShardOperationsAction INSTANCE = new BulkShardOperationsAction(); + public static final String NAME = "indices:data/write/bulk_shard_operations[s]"; + + private BulkShardOperationsAction() { + super(NAME); + } + + @Override + public BulkShardOperationsRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new BulkShardOperationsRequestBuilder(client); + } + + @Override + public BulkShardOperationsResponse newResponse() { + return new BulkShardOperationsResponse(); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java new file mode 100644 index 00000000000..ef9d27ef919 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; + +public final class BulkShardOperationsRequest extends ReplicatedWriteRequest { + + private Translog.Operation[] operations; + + public BulkShardOperationsRequest() { + + } + + public BulkShardOperationsRequest(final ShardId shardId, final Translog.Operation[] operations) { + super(shardId); + setRefreshPolicy(RefreshPolicy.NONE); + this.operations = operations; + } + + public Translog.Operation[] getOperations() { + return operations; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeArray(Translog.Operation::writeOperation, operations); + } + + @Override + public String toString() { + return "BulkShardOperationsRequest{" + + "operations=" + operations.length+ + ", shardId=" + shardId + + ", timeout=" + timeout + + ", index='" + index + '\'' + + ", waitForActiveShards=" + waitForActiveShards + + '}'; + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequestBuilder.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequestBuilder.java new file mode 100644 index 00000000000..38b13dca9c0 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequestBuilder.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class BulkShardOperationsRequestBuilder + extends ActionRequestBuilder { + + public BulkShardOperationsRequestBuilder(final ElasticsearchClient client) { + super(client, BulkShardOperationsAction.INSTANCE, new BulkShardOperationsRequest()); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java new file mode 100644 index 00000000000..62612e4bb4b --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; + +public final class BulkShardOperationsResponse extends ReplicationResponse implements WriteResponse { + + @Override + public void setForcedRefresh(final boolean forcedRefresh) { + + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java new file mode 100644 index 00000000000..1930f91cbb1 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class TransportBulkShardOperationsAction + extends TransportWriteAction { + + @Inject + public TransportBulkShardOperationsAction( + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final IndicesService indicesService, + final ThreadPool threadPool, + final ShardStateAction shardStateAction, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver) { + super( + settings, + BulkShardOperationsAction.NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + indexNameExpressionResolver, + BulkShardOperationsRequest::new, + BulkShardOperationsRequest::new, + ThreadPool.Names.WRITE); + } + + @Override + protected WritePrimaryResult shardOperationOnPrimary( + final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { + final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY); + return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger); + } + + @Override + protected WriteReplicaResult shardOperationOnReplica( + final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { + final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA); + return new WriteReplicaResult<>(request, location, null, replica, logger); + } + + private Translog.Location applyTranslogOperations( + final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { + Translog.Location location = null; + for (final Translog.Operation operation : request.getOperations()) { + final Engine.Result result = shard.applyTranslogOperation(operation, origin, m -> { + // TODO: Figure out how to deal best with dynamic mapping updates from the leader side: + throw new MapperException("dynamic mapping updates are not allowed in follow shards [" + operation + "]"); + }); + assert result.getSeqNo() == operation.seqNo(); + assert result.hasFailure() == false; + location = locationToSync(location, result.getTranslogLocation()); + } + assert request.getOperations().length == 0 || location != null; + return location; + } + + @Override + protected BulkShardOperationsResponse newResponseInstance() { + return new BulkShardOperationsResponse(); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java new file mode 100644 index 00000000000..ae15f76c4ae --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.index.engine; + +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.xpack.ccr.CcrSettings; + +import java.io.IOException; + +/** + * An engine implementation for following shards. + */ +public final class FollowingEngine extends InternalEngine { + + /** + * Construct a new following engine with the specified engine configuration. + * + * @param engineConfig the engine configuration + */ + FollowingEngine(final EngineConfig engineConfig) { + super(validateEngineConfig(engineConfig)); + } + + private static EngineConfig validateEngineConfig(final EngineConfig engineConfig) { + if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(engineConfig.getIndexSettings().getSettings()) == false) { + throw new IllegalArgumentException("a following engine can not be constructed for a non-following index"); + } + return engineConfig; + } + + private void preFlight(final Operation operation) { + /* + * We assert here so that this goes uncaught in unit tests and fails nodes in standalone tests (we want a harsh failure so that we + * do not have a situation where a shard fails and is recovered elsewhere and a test subsequently passes). We throw an exception so + * that we also prevent issues in production code. + */ + assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO; + if (operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + throw new IllegalStateException("a following engine does not accept operations without an assigned sequence number"); + } + } + + @Override + protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { + preFlight(index); + return planIndexingAsNonPrimary(index); + } + + @Override + protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { + preFlight(delete); + return planDeletionAsNonPrimary(delete); + } + + @Override + protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) { + // sequence number should be set when operation origin is primary + assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on a following index must have an assigned sequence number"; + return true; + } + + @Override + protected boolean assertNonPrimaryOrigin(final Operation operation) { + return true; + } + + @Override + protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { + assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL + : "version [" + index.version() + "], type [" + index.versionType() + "]"; + return true; + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineFactory.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineFactory.java new file mode 100644 index 00000000000..4d20fbe522d --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineFactory.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.index.engine; + +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; + +/** + * An engine factory for following engines. + */ +public final class FollowingEngineFactory implements EngineFactory { + + @Override + public Engine newReadWriteEngine(final EngineConfig config) { + return new FollowingEngine(config); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java new file mode 100644 index 00000000000..b35f4943942 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.xpack.ccr.action.ShardFollowTask; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE; +import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request; +import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Response; + +// TODO: change to confirm with API design +public class RestFollowExistingIndexAction extends BaseRestHandler { + + public RestFollowExistingIndexAction(Settings settings, RestController controller) { + super(settings); + // TODO: figure out why: '/{follow_index}/_xpack/ccr/_follow' path clashes with create index api. + controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_follow", this); + } + + @Override + public String getName() { + return "xpack_ccr_follow_index_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = new Request(); + request.setLeaderIndex(restRequest.param("leader_index")); + request.setFollowIndex(restRequest.param("follow_index")); + if (restRequest.hasParam(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())) { + request.setBatchSize(Long.valueOf(restRequest.param(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName()))); + } + if (restRequest.hasParam(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())) { + request.setConcurrentProcessors(Integer.valueOf(restRequest.param(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName()))); + } + if (restRequest.hasParam(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())) { + long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); + request.setProcessorMaxTranslogBytes(value); + } + return channel -> client.execute(INSTANCE, request, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception { + return new BytesRestResponse(RestStatus.OK, builder.startObject() + .endObject()); + + } + }); + } +} \ No newline at end of file diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowIndexAction.java new file mode 100644 index 00000000000..1233097c46e --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowIndexAction.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.INSTANCE; +import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Request; +import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Response; + +// TODO: change to confirm with API design +public class RestUnfollowIndexAction extends BaseRestHandler { + + public RestUnfollowIndexAction(Settings settings, RestController controller) { + super(settings); + // TODO: figure out why: '/{follow_index}/_xpack/ccr/_unfollow' path clashes with create index api. + controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_unfollow", this); + } + + @Override + public String getName() { + return "xpack_ccr_unfollow_index_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = new Request(); + request.setFollowIndex(restRequest.param("follow_index")); + return channel -> client.execute(INSTANCE, request, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception { + return new BytesRestResponse(RestStatus.OK, builder.startObject() + .endObject()); + + } + }); + } +} diff --git a/x-pack/plugin/ccr/src/main/plugin-metadata/plugin-security.policy b/x-pack/plugin/ccr/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 00000000000..45d92fd2b8a --- /dev/null +++ b/x-pack/plugin/ccr/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,50 @@ +grant { + // needed because of problems in unbound LDAP library + permission java.util.PropertyPermission "*", "read,write"; + + // required to configure the custom mailcap for watcher + permission java.lang.RuntimePermission "setFactory"; + + // needed when sending emails for javax.activation + // otherwise a classnotfound exception is thrown due to trying + // to load the class with the application class loader + permission java.lang.RuntimePermission "setContextClassLoader"; + permission java.lang.RuntimePermission "getClassLoader"; + // TODO: remove use of this jar as soon as possible!!!! + permission java.lang.RuntimePermission "accessClassInPackage.com.sun.activation.registries"; + + // bouncy castle + permission java.security.SecurityPermission "putProviderProperty.BC"; + + // needed for x-pack security extension + permission java.security.SecurityPermission "createPolicy.JavaPolicy"; + permission java.security.SecurityPermission "getPolicy"; + permission java.security.SecurityPermission "setPolicy"; + + // needed for multiple server implementations used in tests + permission java.net.SocketPermission "*", "accept,connect"; + + // needed for Windows named pipes in machine learning + permission java.io.FilePermission "\\\\.\\pipe\\*", "read,write"; +}; + +grant codeBase "${codebase.netty-common}" { + // for reading the system-wide configuration for the backlog of established sockets + permission java.io.FilePermission "/proc/sys/net/core/somaxconn", "read"; +}; + +grant codeBase "${codebase.netty-transport}" { + // Netty NioEventLoop wants to change this, because of https://bugs.openjdk.java.net/browse/JDK-6427854 + // the bug says it only happened rarely, and that its fixed, but apparently it still happens rarely! + permission java.util.PropertyPermission "sun.nio.ch.bugLevel", "write"; +}; + +grant codeBase "${codebase.elasticsearch-rest-client}" { + // rest client uses system properties which gets the default proxy + permission java.net.NetPermission "getProxySelector"; +}; + +grant codeBase "${codebase.httpasyncclient}" { + // rest client uses system properties which gets the default proxy + permission java.net.NetPermission "getProxySelector"; +}; \ No newline at end of file diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrTests.java new file mode 100644 index 00000000000..c9a86295456 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrTests.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; + +import java.io.IOException; +import java.util.Optional; + +import static org.hamcrest.Matchers.instanceOf; + +public class CcrTests extends ESTestCase { + + public void testGetEngineFactory() throws IOException { + final Boolean[] values = new Boolean[] { true, false, null }; + for (final Boolean value : values) { + final String indexName = "following-" + value; + final Index index = new Index(indexName, UUIDs.randomBase64UUID()); + final Settings.Builder builder = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID()); + if (value != null) { + builder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), value); + } + + final IndexMetaData indexMetaData = new IndexMetaData.Builder(index.getName()) + .settings(builder.build()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + final Ccr ccr = new Ccr(Settings.EMPTY); + final Optional engineFactory = + ccr.getEngineFactory(new IndexSettings(indexMetaData, Settings.EMPTY)); + if (value != null && value) { + assertTrue(engineFactory.isPresent()); + assertThat(engineFactory.get(), instanceOf(FollowingEngineFactory.class)); + } else { + assertFalse(engineFactory.isPresent()); + } + } + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalStateCcr.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalStateCcr.java new file mode 100644 index 00000000000..b705ce53e7e --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalStateCcr.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; + +import java.nio.file.Path; + +public class LocalStateCcr extends LocalStateCompositeXPackPlugin { + + public LocalStateCcr(final Settings settings, final Path configPath) throws Exception { + super(settings, configPath); + LocalStateCcr thisVar = this; + + plugins.add(new Ccr(settings){ + @Override + protected XPackLicenseState getLicenseState() { + return thisVar.getLicenseState(); + } + }); + } +} + diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java new file mode 100644 index 00000000000..3e781187301 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -0,0 +1,327 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +/* + +/* + * ELASTICSEARCH CONFIDENTIAL + * __________________ + * + * [2017] Elasticsearch Incorporated. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of Elasticsearch Incorporated and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to Elasticsearch Incorporated + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * Dissemination of this information or reproduction of this material + * is strictly forbidden unless prior written permission is obtained + * from Elasticsearch Incorporated. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.analysis.common.CommonAnalysisPlugin; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.discovery.TestZenDiscovery; +import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; +import org.elasticsearch.xpack.ccr.action.ShardChangesAction; +import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; +import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; +import org.elasticsearch.xpack.core.XPackSettings; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0) +public class ShardChangesIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder newSettings = Settings.builder(); + newSettings.put(super.nodeSettings(nodeOrdinal)); + newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); + return newSettings.build(); + } + + @Override + protected Collection> getMockPlugins() { + return Arrays.asList(TestSeedPlugin.class, TestZenDiscovery.TestPlugin.class); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + // this emulates what the CCR persistent task will do for pulling + public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { + client().admin().indices().prepareCreate("index") + .setSettings(Settings.builder().put("index.number_of_shards", 1)) + .get(); + + client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get(); + + ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0]; + long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); + assertThat(globalCheckPoint, equalTo(2L)); + + ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request.setMinSeqNo(0L); + request.setMaxSeqNo(globalCheckPoint); + ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get(); + assertThat(response.getOperations().length, equalTo(3)); + Translog.Index operation = (Translog.Index) response.getOperations()[0]; + assertThat(operation.seqNo(), equalTo(0L)); + assertThat(operation.id(), equalTo("1")); + + operation = (Translog.Index) response.getOperations()[1]; + assertThat(operation.seqNo(), equalTo(1L)); + assertThat(operation.id(), equalTo("2")); + + operation = (Translog.Index) response.getOperations()[2]; + assertThat(operation.seqNo(), equalTo(2L)); + assertThat(operation.id(), equalTo("3")); + + client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "4").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "5").setSource("{}", XContentType.JSON).get(); + + shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0]; + globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); + assertThat(globalCheckPoint, equalTo(5L)); + + request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request.setMinSeqNo(3L); + request.setMaxSeqNo(globalCheckPoint); + response = client().execute(ShardChangesAction.INSTANCE, request).get(); + assertThat(response.getOperations().length, equalTo(3)); + operation = (Translog.Index) response.getOperations()[0]; + assertThat(operation.seqNo(), equalTo(3L)); + assertThat(operation.id(), equalTo("3")); + + operation = (Translog.Index) response.getOperations()[1]; + assertThat(operation.seqNo(), equalTo(4L)); + assertThat(operation.id(), equalTo("4")); + + operation = (Translog.Index) response.getOperations()[2]; + assertThat(operation.seqNo(), equalTo(5L)); + assertThat(operation.id(), equalTo("5")); + } + + public void testFollowIndex() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.emptyMap()); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + + final String followerIndexSettings = + getIndexSettings(numberOfPrimaryShards, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + + ensureGreen("index1", "index2"); + + final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get(); + + final int firstBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + + for (int i = 0; i < firstBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + final int secondBatchNumDocs = randomIntBetween(2, 64); + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final Map secondBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] secondBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : secondBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); + + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); + unfollowRequest.setFollowIndex("index2"); + client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + + assertBusy(() -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(0)); + }); + } + + public void testFollowNonExistentIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test-leader").get()); + assertAcked(client().admin().indices().prepareCreate("test-follower").get()); + final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + // Leader index does not exist. + followRequest.setLeaderIndex("non-existent-leader"); + followRequest.setFollowIndex("test-follower"); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); + // Follower index does not exist. + followRequest.setLeaderIndex("test-leader"); + followRequest.setFollowIndex("non-existent-follower"); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); + // Both indices do not exist. + followRequest.setLeaderIndex("non-existent-leader"); + followRequest.setFollowIndex("non-existent-follower"); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); + } + + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { + return () -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards)); + + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + listTasksRequest.setActions(ShardFollowTask.NAME + "[c]"); + ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).actionGet(); + assertThat(listTasksResponse.getNodeFailures().size(), equalTo(0)); + assertThat(listTasksResponse.getTaskFailures().size(), equalTo(0)); + + List taskInfos = listTasksResponse.getTasks(); + assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards)); + for (PersistentTasksCustomMetaData.PersistentTask task : tasks.tasks()) { + final ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams(); + + TaskInfo taskInfo = null; + String expectedId = "id=" + task.getId(); + for (TaskInfo info : taskInfos) { + if (expectedId.equals(info.getDescription())) { + taskInfo = info; + break; + } + } + assertThat(taskInfo, notNullValue()); + ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus(); + assertThat(status, notNullValue()); + assertThat( + status.getProcessedGlobalCheckpoint(), + equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId()))); + } + }; + } + + private CheckedRunnable assertExpectedDocumentRunnable(final int value) { + return () -> { + final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get(); + assertTrue(getResponse.isExists()); + assertTrue((getResponse.getSource().containsKey("f"))); + assertThat(getResponse.getSource().get("f"), equalTo(value)); + }; + } + + private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalIndexSettings) throws IOException { + final String settings; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("settings"); + { + builder.field("index.number_of_shards", numberOfPrimaryShards); + for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { + builder.field(additionalSetting.getKey(), additionalSetting.getValue()); + } + } + builder.endObject(); + builder.startObject("mappings"); + { + builder.startObject("doc"); + { + builder.startObject("properties"); + { + builder.startObject("f"); + { + builder.field("type", "integer"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + settings = BytesReference.bytes(builder).utf8ToString(); + } + return settings; + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java new file mode 100644 index 00000000000..12edf2f68d0 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -0,0 +1,367 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; + +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ChunksCoordinatorTests extends ESTestCase { + + public void testCreateChunks() { + Client client = mock(Client.class); + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + ChunksCoordinator coordinator = + new ChunksCoordinator(client, client, ccrExecutor, 1024, 1, Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); + coordinator.createChucks(0, 1024); + List result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0)[0], equalTo(0L)); + assertThat(result.get(0)[1], equalTo(1024L)); + + coordinator.getChunks().clear(); + coordinator.createChucks(0, 2048); + result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0)[0], equalTo(0L)); + assertThat(result.get(0)[1], equalTo(1024L)); + assertThat(result.get(1)[0], equalTo(1025L)); + assertThat(result.get(1)[1], equalTo(2048L)); + + coordinator.getChunks().clear(); + coordinator.createChucks(0, 4096); + result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(4)); + assertThat(result.get(0)[0], equalTo(0L)); + assertThat(result.get(0)[1], equalTo(1024L)); + assertThat(result.get(1)[0], equalTo(1025L)); + assertThat(result.get(1)[1], equalTo(2048L)); + assertThat(result.get(2)[0], equalTo(2049L)); + assertThat(result.get(2)[1], equalTo(3072L)); + assertThat(result.get(3)[0], equalTo(3073L)); + assertThat(result.get(3)[1], equalTo(4096L)); + + coordinator.getChunks().clear(); + coordinator.createChucks(4096, 8196); + result = new ArrayList<>(coordinator.getChunks()); + assertThat(result.size(), equalTo(5)); + assertThat(result.get(0)[0], equalTo(4096L)); + assertThat(result.get(0)[1], equalTo(5120L)); + assertThat(result.get(1)[0], equalTo(5121L)); + assertThat(result.get(1)[1], equalTo(6144L)); + assertThat(result.get(2)[0], equalTo(6145L)); + assertThat(result.get(2)[1], equalTo(7168L)); + assertThat(result.get(3)[0], equalTo(7169L)); + assertThat(result.get(3)[1], equalTo(8192L)); + assertThat(result.get(4)[0], equalTo(8193L)); + assertThat(result.get(4)[1], equalTo(8196L)); + } + + public void testCoordinator() throws Exception { + Client client = mock(Client.class); + mockShardChangesApiCall(client); + mockBulkShardOperationsApiCall(client); + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + Consumer handler = e -> assertThat(e, nullValue()); + int concurrentProcessors = randomIntBetween(1, 4); + int batchSize = randomIntBetween(1, 1000); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, batchSize, concurrentProcessors, + Long.MAX_VALUE, leaderShardId, followShardId, handler); + + int numberOfOps = randomIntBetween(batchSize, batchSize * 20); + long from = randomInt(1000); + long to = from + numberOfOps; + coordinator.createChucks(from, to); + int expectedNumberOfChunks = numberOfOps / batchSize; + if (numberOfOps % batchSize > 0) { + expectedNumberOfChunks++; + } + assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks)); + + coordinator.start(); + assertThat(coordinator.getChunks().size(), equalTo(0)); + verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE), + any(ShardChangesAction.Request.class), any()); + verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE), + any(BulkShardOperationsRequest.class), any()); + } + + public void testCoordinator_failure() throws Exception { + Exception expectedException = new RuntimeException("throw me"); + Client client = mock(Client.class); + boolean shardChangesActionApiCallFailed; + if (randomBoolean()) { + shardChangesActionApiCallFailed = true; + doThrow(expectedException).when(client).execute(same(ShardChangesAction.INSTANCE), + any(ShardChangesAction.Request.class), any()); + } else { + shardChangesActionApiCallFailed = false; + mockShardChangesApiCall(client); + doThrow(expectedException).when(client).execute(same(BulkShardOperationsAction.INSTANCE), + any(BulkShardOperationsRequest.class), any()); + } + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + Consumer handler = e -> { + assertThat(e, notNullValue()); + assertThat(e, sameInstance(expectedException)); + }; + ChunksCoordinator coordinator = + new ChunksCoordinator(client, client, ccrExecutor, 10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler); + coordinator.createChucks(0, 20); + assertThat(coordinator.getChunks().size(), equalTo(2)); + + coordinator.start(); + assertThat(coordinator.getChunks().size(), equalTo(1)); + verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), + any()); + verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE), + any(BulkShardOperationsRequest.class), any()); + } + + public void testCoordinator_concurrent() throws Exception { + Client client = mock(Client.class); + mockShardChangesApiCall(client); + mockBulkShardOperationsApiCall(client); + Executor ccrExecutor = command -> new Thread(command).start(); + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + AtomicBoolean calledOnceChecker = new AtomicBoolean(false); + AtomicReference failureHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + Consumer handler = e -> { + if (failureHolder.compareAndSet(null, e) == false) { + // This handler should only be called once, irregardless of the number of concurrent processors + calledOnceChecker.set(true); + } + latch.countDown(); + }; + ChunksCoordinator coordinator = + new ChunksCoordinator(client, client, ccrExecutor, 1000, 4, Long.MAX_VALUE, leaderShardId, followShardId, handler); + coordinator.createChucks(0, 1000000); + assertThat(coordinator.getChunks().size(), equalTo(1000)); + + coordinator.start(); + latch.await(); + assertThat(coordinator.getChunks().size(), equalTo(0)); + verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); + verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); + assertThat(calledOnceChecker.get(), is(false)); + } + + public void testChunkProcessor() { + Client client = mock(Client.class); + Queue chunks = new LinkedList<>(); + mockShardChangesApiCall(client); + mockBulkShardOperationsApiCall(client); + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10, Long.MAX_VALUE); + assertThat(invoked[0], is(true)); + assertThat(exception[0], nullValue()); + } + + public void testChunkProcessorRetry() { + Client client = mock(Client.class); + Queue chunks = new LinkedList<>(); + mockBulkShardOperationsApiCall(client); + int testRetryLimit = randomIntBetween(1, ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT - 1); + mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); + + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10, Long.MAX_VALUE); + assertThat(invoked[0], is(true)); + assertThat(exception[0], nullValue()); + assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1)); + } + + public void testChunkProcessorRetryTooManyTimes() { + Client client = mock(Client.class); + Queue chunks = new LinkedList<>(); + mockBulkShardOperationsApiCall(client); + int testRetryLimit = ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT + 1; + mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); + + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10, Long.MAX_VALUE); + assertThat(invoked[0], is(true)); + assertThat(exception[0], notNullValue()); + assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting...")); + assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); + assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); + } + + public void testChunkProcessorNoneRetryableError() { + Client client = mock(Client.class); + Queue chunks = new LinkedList<>(); + mockBulkShardOperationsApiCall(client); + mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected")); + + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10, Long.MAX_VALUE); + assertThat(invoked[0], is(true)); + assertThat(exception[0], notNullValue()); + assertThat(exception[0].getMessage(), equalTo("unexpected")); + assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); + } + + public void testChunkProcessorExceedMaxTranslogsBytes() { + long from = 0; + long to = 20; + long actualTo = 10; + Client client = mock(Client.class); + Queue chunks = new LinkedList<>(); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 3; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) args[2]; + + List operations = new ArrayList<>(); + for (int i = 0; i <= actualTo; i++) { + operations.add(new Translog.NoOp(i, 1, "test")); + } + listener.onResponse(new ShardChangesAction.Response(operations.toArray(new Translog.Operation[0]))); + return null; + }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); + + mockBulkShardOperationsApiCall(client); + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(from, to, Long.MAX_VALUE); + assertThat(invoked[0], is(true)); + assertThat(exception[0], nullValue()); + assertThat(chunks.size(), equalTo(1)); + assertThat(chunks.peek()[0], equalTo(11L)); + assertThat(chunks.peek()[1], equalTo(20L)); + } + + private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) { + int[] retryCounter = new int[1]; + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 3; + ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) args[2]; + if (retryCounter[0]++ <= testRetryLimit) { + listener.onFailure(e); + } else { + long delta = request.getMaxSeqNo() - request.getMinSeqNo(); + Translog.Operation[] operations = new Translog.Operation[(int) delta]; + for (int i = 0; i < operations.length; i++) { + operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"); + } + ShardChangesAction.Response response = new ShardChangesAction.Response(operations); + listener.onResponse(response); + } + return null; + }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); + } + + private void mockShardChangesApiCall(Client client) { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 3; + ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) args[2]; + + List operations = new ArrayList<>(); + for (long i = request.getMinSeqNo(); i <= request.getMaxSeqNo(); i++) { + operations.add(new Translog.NoOp(request.getMinSeqNo() + i, 1, "test")); + } + ShardChangesAction.Response response = new ShardChangesAction.Response(operations.toArray(new Translog.Operation[0])); + listener.onResponse(response); + return null; + }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); + } + + private void mockBulkShardOperationsApiCall(Client client) { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 3; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) args[2]; + listener.onResponse(new BulkShardOperationsResponse()); + return null; + }).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java new file mode 100644 index 00000000000..b91c43d74e7 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class ShardChangesActionTests extends ESSingleNodeTestCase { + + public void testGetOperationsBetween() throws Exception { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.translog.generation_threshold_size", new ByteSizeValue(randomIntBetween(8, 64), ByteSizeUnit.KB)) + .build(); + final IndexService indexService = createIndex("index", settings); + + final int numWrites = randomIntBetween(2, 8192); + for (int i = 0; i < numWrites; i++) { + client().prepareIndex("index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + } + + // A number of times, get operations within a range that exists: + int iters = randomIntBetween(8, 32); + IndexShard indexShard = indexService.getShard(0); + for (int iter = 0; iter < iters; iter++) { + int min = randomIntBetween(0, numWrites - 1); + int max = randomIntBetween(min, numWrites - 1); + + final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE); + /* + * We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple + * generations) so the best we can assert is that we see the expected operations. + */ + final Set seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toSet()); + final Set expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet()); + assertThat(seenSeqNos, equalTo(expectedSeqNos)); + } + + // get operations for a range no operations exists: + Exception e = expectThrows(IllegalStateException.class, + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE)); + assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" + + (numWrites + 1) +"] found, tracker checkpoint [")); + + // get operations for a range some operations do not exist: + e = expectThrows(IllegalStateException.class, + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE)); + assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" + + (numWrites + 10) +"] found, tracker checkpoint [")); + } + + public void testGetOperationsBetweenWhenShardNotStarted() throws Exception { + IndexShard indexShard = Mockito.mock(IndexShard.class); + + ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING); + Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting); + expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE)); + } + + public void testGetOperationsBetweenExceedByteLimit() throws Exception { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build(); + final IndexService indexService = createIndex("index", settings); + + final long numWrites = 32; + for (int i = 0; i < numWrites; i++) { + client().prepareIndex("index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + } + + final IndexShard indexShard = indexService.getShard(0); + final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256); + assertThat(r.getOperations().length, equalTo(12)); + assertThat(r.getOperations()[0].seqNo(), equalTo(0L)); + assertThat(r.getOperations()[1].seqNo(), equalTo(1L)); + assertThat(r.getOperations()[2].seqNo(), equalTo(2L)); + assertThat(r.getOperations()[3].seqNo(), equalTo(3L)); + assertThat(r.getOperations()[4].seqNo(), equalTo(4L)); + assertThat(r.getOperations()[5].seqNo(), equalTo(5L)); + assertThat(r.getOperations()[6].seqNo(), equalTo(6L)); + assertThat(r.getOperations()[7].seqNo(), equalTo(7L)); + assertThat(r.getOperations()[8].seqNo(), equalTo(8L)); + assertThat(r.getOperations()[9].seqNo(), equalTo(9L)); + assertThat(r.getOperations()[10].seqNo(), equalTo(10L)); + assertThat(r.getOperations()[11].seqNo(), equalTo(11L)); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java new file mode 100644 index 00000000000..a0b68c3699b --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractStreamableTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.nullValue; + +public class ShardChangesRequestTests extends AbstractStreamableTestCase { + + @Override + protected ShardChangesAction.Request createTestInstance() { + ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0)); + request.setMaxSeqNo(randomNonNegativeLong()); + request.setMinSeqNo(randomNonNegativeLong()); + return request; + } + + @Override + protected ShardChangesAction.Request createBlankInstance() { + return new ShardChangesAction.Request(); + } + + public void testValidate() { + ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0)); + request.setMinSeqNo(-1); + assertThat(request.validate().getMessage(), containsString("minSeqNo [-1] cannot be lower than 0")); + + request.setMinSeqNo(4); + assertThat(request.validate().getMessage(), containsString("minSeqNo [4] cannot be larger than maxSeqNo [0]")); + + request.setMaxSeqNo(8); + assertThat(request.validate(), nullValue()); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java new file mode 100644 index 00000000000..d169be667cd --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.AbstractStreamableTestCase; + +public class ShardChangesResponseTests extends AbstractStreamableTestCase { + + @Override + protected ShardChangesAction.Response createTestInstance() { + final int numOps = randomInt(8); + final Translog.Operation[] operations = new Translog.Operation[numOps]; + for (int i = 0; i < numOps; i++) { + operations[i] = new Translog.NoOp(i, 0, "test"); + } + return new ShardChangesAction.Response(operations); + } + + @Override + protected ShardChangesAction.Response createBlankInstance() { + return new ShardChangesAction.Response(); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java new file mode 100644 index 00000000000..69787998fbb --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase { + + @Override + protected ShardFollowNodeTask.Status doParseInstance(XContentParser parser) throws IOException { + return ShardFollowNodeTask.Status.fromXContent(parser); + } + + @Override + protected ShardFollowNodeTask.Status createTestInstance() { + return new ShardFollowNodeTask.Status(randomNonNegativeLong()); + } + + @Override + protected Writeable.Reader instanceReader() { + return ShardFollowNodeTask.Status::new; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java new file mode 100644 index 00000000000..148f3acd209 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class ShardFollowTaskTests extends AbstractSerializingTestCase { + + @Override + protected ShardFollowTask doParseInstance(XContentParser parser) throws IOException { + return ShardFollowTask.fromXContent(parser); + } + + @Override + protected ShardFollowTask createTestInstance() { + return new ShardFollowTask( + randomAlphaOfLength(4), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE)); + } + + @Override + protected Writeable.Reader instanceReader() { + return ShardFollowTask::new; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java new file mode 100644 index 00000000000..56ee10a98d1 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -0,0 +1,310 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.index.engine; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineTestCase; +import org.elasticsearch.index.engine.TranslogHandler; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; + +public class FollowingEngineTests extends ESTestCase { + + private ThreadPool threadPool; + private Index index; + private ShardId shardId; + private AtomicLong primaryTerm = new AtomicLong(); + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("following-engine-tests"); + index = new Index("index", "uuid"); + shardId = new ShardId(index, 0); + primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE)); + } + + public void tearDown() throws Exception { + terminate(threadPool); + super.tearDown(); + } + + public void testFollowingEngineRejectsNonFollowingIndex() throws IOException { + final Settings.Builder builder = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT); + if (randomBoolean()) { + builder.put("index.xpack.ccr.following_index", false); + } + final Settings settings = builder.build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new FollowingEngine(engineConfig)); + assertThat(e, hasToString(containsString("a following engine can not be constructed for a non-following index"))); + } + } + + public void testIndexSeqNoIsMaintained() throws IOException { + final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); + runIndexTest( + seqNo, + Engine.Operation.Origin.PRIMARY, + (followingEngine, index) -> { + final Engine.IndexResult result = followingEngine.index(index); + assertThat(result.getSeqNo(), equalTo(seqNo)); + }); + } + + /* + * A following engine (whether or not it is an engine for a primary or replica shard) needs to maintain ordering semantics as the + * operations presented to it can arrive out of order (while a leader engine that is for a primary shard dictates the order). This test + * ensures that these semantics are maintained. + */ + public void testOutOfOrderDocuments() throws IOException { + final Settings settings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { + final VersionType versionType = + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE); + final List ops = EngineTestCase.generateSingleDocHistory(true, versionType, false, 2, 2, 20); + EngineTestCase.assertOpsOnReplica(ops, followingEngine, true, logger); + } + } + } + + public void runIndexTest( + final long seqNo, + final Engine.Operation.Origin origin, + final CheckedBiConsumer consumer) throws IOException { + final Settings settings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { + final String id = "id"; + final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); + final String type = "type"; + final Field versionField = new NumericDocValuesField("_version", 0); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + final ParseContext.Document document = new ParseContext.Document(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + final BytesReference source = new BytesArray(new byte[]{1}); + final ParsedDocument parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + type, + "routing", + Collections.singletonList(document), + source, + XContentType.JSON, + null); + + final long version; + final long autoGeneratedIdTimestamp; + if (randomBoolean()) { + version = 1; + autoGeneratedIdTimestamp = System.currentTimeMillis(); + } else { + version = randomNonNegativeLong(); + autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } + final Engine.Index index = new Engine.Index( + new Term("_id", parsedDocument.id()), + parsedDocument, + seqNo, + primaryTerm.get(), + version, + VersionType.EXTERNAL, + origin, + System.currentTimeMillis(), + autoGeneratedIdTimestamp, + randomBoolean()); + + consumer.accept(followingEngine, index); + } + } + } + + public void testDeleteSeqNoIsMaintained() throws IOException { + final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); + runDeleteTest( + seqNo, + Engine.Operation.Origin.PRIMARY, + (followingEngine, delete) -> { + final Engine.DeleteResult result = followingEngine.delete(delete); + assertThat(result.getSeqNo(), equalTo(seqNo)); + }); + } + + public void runDeleteTest( + final long seqNo, + final Engine.Operation.Origin origin, + final CheckedBiConsumer consumer) throws IOException { + final Settings settings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { + final String id = "id"; + final Engine.Delete delete = new Engine.Delete( + "type", + id, + new Term("_id", id), + seqNo, + primaryTerm.get(), + randomNonNegativeLong(), + VersionType.EXTERNAL, + origin, + System.currentTimeMillis()); + + consumer.accept(followingEngine, delete); + } + } + } + + private EngineConfig engineConfig( + final ShardId shardId, + final IndexSettings indexSettings, + final ThreadPool threadPool, + final Store store, + final Logger logger, + final NamedXContentRegistry xContentRegistry) throws IOException { + final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + final Path translogPath = createTempDir("translog"); + final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + return new EngineConfig( + shardId, + "allocation-id", + threadPool, + indexSettings, + null, + store, + newMergePolicy(), + indexWriterConfig.getAnalyzer(), + indexWriterConfig.getSimilarity(), + new CodecService(null, logger), + new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + + } + }, + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + translogConfig, + TimeValue.timeValueMinutes(5), + Collections.emptyList(), + Collections.emptyList(), + null, + new TranslogHandler( + xContentRegistry, IndexSettingsModule.newIndexSettings(shardId.getIndexName(), indexSettings.getSettings())), + new NoneCircuitBreakerService(), + () -> SequenceNumbers.NO_OPS_PERFORMED, + () -> primaryTerm.get(), + EngineTestCase::createTombstoneDoc + ); + } + + private static Store createStore( + final ShardId shardId, final IndexSettings indexSettings, final Directory directory) throws IOException { + final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { + @Override + public Directory newDirectory() throws IOException { + return directory; + } + }; + return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); + } + + private FollowingEngine createEngine(Store store, EngineConfig config) throws IOException { + store.createEmpty(); + final String translogUuid = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L); + store.associateIndexWithNewTranslog(translogUuid); + FollowingEngine followingEngine = new FollowingEngine(config); + followingEngine.recoverFromTranslog(); + return followingEngine; + } + +}