Opened x-pack ccr code

This commit is contained in:
Martijn van Groningen 2018-04-25 08:03:28 +02:00
commit cbd38983f4
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
32 changed files with 3644 additions and 0 deletions

View File

@ -0,0 +1,52 @@
import com.carrotsearch.gradle.junit4.RandomizedTestingTask
import org.elasticsearch.gradle.BuildPlugin
evaluationDependsOn(':x-pack-elasticsearch:plugin:core')
apply plugin: 'elasticsearch.esplugin'
esplugin {
name 'x-pack-ccr'
description 'Elasticsearch Expanded Pack Plugin - CCR'
classname 'org.elasticsearch.xpack.ccr.Ccr'
hasNativeController false
requiresKeystore true
extendedPlugins = ['x-pack-core']
licenseFile project(':x-pack-elasticsearch').file('LICENSE.txt')
noticeFile project(':x-pack-elasticsearch').file('NOTICE.txt')
}
archivesBaseName = 'x-pack-ccr'
integTest.enabled = false
// Instead we create a separate task to run the
// tests based on ESIntegTestCase
task internalClusterTest(type: RandomizedTestingTask,
group: JavaBasePlugin.VERIFICATION_GROUP,
description: 'Java fantasy integration tests',
dependsOn: test.dependsOn) {
configure(BuildPlugin.commonTestConfig(project))
classpath = project.test.classpath
testClassesDir = project.test.testClassesDir
include '**/*IT.class'
systemProperty 'es.set.netty.runtime.available.processors', 'false'
}
check {
dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest']
}
internalClusterTest.mustRunAfter test
dependencies {
compileOnly "org.elasticsearch:elasticsearch:${version}"
compileOnly "org.elasticsearch.plugin:x-pack-core:${version}"
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
}
dependencyLicenses {
ignoreSha 'x-pack-core'
}
run {
plugin ':x-pack-elasticsearch:plugin:core'
}

View File

@ -0,0 +1,12 @@
/* Remove assemble on all qa projects because we don't need to publish
* artifacts for them. */
gradle.projectsEvaluated {
subprojects {
Task assemble = project.tasks.findByName('assemble')
if (assemble) {
project.tasks.remove(assemble)
project.build.dependsOn.remove('assemble')
}
}
}

View File

@ -0,0 +1,40 @@
import org.elasticsearch.gradle.test.RestIntegTestTask
apply plugin: 'elasticsearch.standalone-test'
dependencies {
testCompile project(path: xpackModule('core'), configuration: 'runtime')
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
}
task leaderClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}
leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
plugin xpackProject('plugin').path
}
leaderClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'true'
}
task followClusterTest(type: RestIntegTestTask) {}
followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
plugin xpackProject('plugin').path
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}
followClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'false'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
finalizedBy 'leaderClusterTestCluster#stop'
}
test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test

View File

@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
public class FollowIndexIT extends ESRestTestCase {
private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster"));
@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
public void testFollowIndex() throws Exception {
final int numDocs = 128;
final String leaderIndexName = "test_index1";
if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster");
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(client(), leaderIndexName, Integer.toString(i), "field", i);
}
refresh(leaderIndexName);
verifyDocuments(leaderIndexName, numDocs);
} else {
logger.info("Running against follow cluster");
final String followIndexName = "test_index2";
Settings indexSettings = Settings.builder()
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}");
ensureYellow(followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id);
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1);
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2);
}
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
}
}
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
document.field((String) fields[i], fields[i + 1]);
}
document.endObject();
assertOK(client.performRequest("POST", "/" + index + "/doc/" + id, emptyMap(),
new StringEntity(Strings.toString(document), ContentType.APPLICATION_JSON)));
}
private static void refresh(String index) throws IOException {
assertOK(client().performRequest("POST", "/" + index + "/_refresh"));
}
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
assertOK(client().performRequest("POST", "/_xpack/ccr/" + followIndex + "/_follow", params));
}
private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("size", Integer.toString(expectedNumDocs));
params.put("sort", "field:asc");
Map<String, ?> response = toMap(client().performRequest("GET", "/" + index + "/_search", params));
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(numDocs, equalTo(expectedNumDocs));
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), equalTo(expectedNumDocs));
for (int i = 0; i < expectedNumDocs; i++) {
int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
assertThat(i, equalTo(value));
}
}
private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
private static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}
private static void ensureYellow(String index) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("wait_for_status", "yellow");
params.put("wait_for_no_relocating_shards", "true");
params.put("timeout", "30s");
params.put("level", "shards");
assertOK(client().performRequest("GET", "_cluster/health/" + index, params));
}
private RestClient buildLeaderClient() throws IOException {
assert runningAgainstLeaderCluster == false;
String leaderUrl = System.getProperty("tests.leader_host");
int portSeparator = leaderUrl.lastIndexOf(':');
HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator),
Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol());
return buildClient(Settings.EMPTY, new HttpHost[]{httpHost});
}
}

View File

@ -0,0 +1,169 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING;
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING;
/**
* Container class for CCR functionality.
*/
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin {
public static final String CCR_THREAD_POOL_NAME = "ccr";
private final boolean enabled;
private final Settings settings;
/**
* Construct an instance of the CCR container with the specified settings.
*
* @param settings the settings
*/
public Ccr(final Settings settings) {
this.settings = settings;
this.enabled = CCR_ENABLED_SETTING.get(settings);
}
@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
ThreadPool threadPool, Client client) {
return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool));
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (enabled == false) {
return emptyList();
}
return Arrays.asList(
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class)
);
}
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList(
new RestUnfollowIndexAction(settings, restController),
new RestFollowExistingIndexAction(settings, restController)
);
}
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
// Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ShardFollowTask.NAME,
ShardFollowTask::new),
// Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME,
ShardFollowNodeTask.Status::new)
);
}
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
// Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ShardFollowTask.NAME),
ShardFollowTask::fromXContent),
// Task statuses
new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME),
ShardFollowNodeTask.Status::fromXContent)
);
}
/**
* The settings defined by CCR.
*
* @return the settings
*/
public List<Setting<?>> getSettings() {
return CcrSettings.getSettings();
}
/**
* The optional engine factory for CCR. This method inspects the index settings for the {@link CcrSettings#CCR_FOLLOWING_INDEX_SETTING}
* setting to determine whether or not the engine implementation should be a following engine.
*
* @return the optional engine factory
*/
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
if (CCR_FOLLOWING_INDEX_SETTING.get(indexSettings.getSettings())) {
return Optional.of(new FollowingEngineFactory());
} else {
return Optional.empty();
}
}
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (enabled == false) {
return Collections.emptyList();
}
FixedExecutorBuilder ccrTp = new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME,
32, 100, "xpack.ccr.ccr_thread_pool");
return Collections.singletonList(ccrTp);
}
protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Container class for CCR settings.
*/
public final class CcrSettings {
// prevent construction
private CcrSettings() {
}
/**
* Setting for controlling whether or not CCR is enabled.
*/
static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);
/**
* Index setting for a following index.
*/
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope);
/**
* The settings defined by CCR.
*
* @return the settings
*/
static List<Setting<?>> getSettings() {
return Arrays.asList(
CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING);
}
}

View File

@ -0,0 +1,277 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.Request,
FollowExistingIndexAction.Response, FollowExistingIndexAction.RequestBuilder> {
public static final FollowExistingIndexAction INSTANCE = new FollowExistingIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/follow_existing_index";
private FollowExistingIndexAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
private String leaderIndex;
private String followIndex;
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS;
private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;
public String getLeaderIndex() {
return leaderIndex;
}
public void setLeaderIndex(String leaderIndex) {
this.leaderIndex = leaderIndex;
}
public String getFollowIndex() {
return followIndex;
}
public void setFollowIndex(String followIndex) {
this.followIndex = followIndex;
}
public long getBatchSize() {
return batchSize;
}
public void setBatchSize(long batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]");
}
this.batchSize = batchSize;
}
public void setConcurrentProcessors(int concurrentProcessors) {
if (concurrentProcessors < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
this.concurrentProcessors = concurrentProcessors;
}
public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
if (processorMaxTranslogBytes <= 0) {
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0");
}
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
batchSize = in.readVLong();
processorMaxTranslogBytes = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeVLong(batchSize);
out.writeVLong(processorMaxTranslogBytes);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, FollowExistingIndexAction.RequestBuilder> {
RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse {
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final ClusterService clusterService;
private final RemoteClusterService remoteClusterService;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService,
PersistentTasksService persistentTasksService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
ClusterState localClusterState = clusterService.state();
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex);
String[] indices = new String[]{request.getLeaderIndex()};
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex);
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
} else {
// Following an index in remote cluster, so use remote client to fetch leader IndexMetaData:
assert remoteClusterIndices.size() == 1;
Map.Entry<String, List<String>> entry = remoteClusterIndices.entrySet().iterator().next();
assert entry.getValue().size() == 1;
String clusterNameAlias = entry.getKey();
String leaderIndex = entry.getValue().get(0);
Client remoteClient = client.getRemoteClusterClient(clusterNameAlias);
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> {
ClusterState remoteClusterState = r.getState();
IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex);
start(request, clusterNameAlias, leaderIndexMetadata, followIndexMetadata, listener);
}, listener::onFailure));
}
}
/**
* Performs validation on the provided leader and follow {@link IndexMetaData} instances and then
* creates a persistent task for each leader primary shard. This persistent tasks track changes in the leader
* shard and replicate these changes to a follower shard.
*
* Currently the following validation is performed:
* <ul>
* <li>The leader index and follow index need to have the same number of primary shards</li>
* </ul>
*/
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) {
if (leaderIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));
return;
}
if (followIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));
return;
}
if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) {
handler.onFailure(new IllegalArgumentException("leader index primary shards [" +
leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " +
"shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]"));
// TODO: other validation checks
} else {
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}
if (error == null) {
// include task ids?
handler.onResponse(new Response());
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
}
}
}
);
}
}
}
}
}

View File

@ -0,0 +1,297 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequestBuilder;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class ShardChangesAction extends Action<ShardChangesAction.Request, ShardChangesAction.Response, ShardChangesAction.RequestBuilder> {
public static final ShardChangesAction INSTANCE = new ShardChangesAction();
public static final String NAME = "cluster:admin/xpack/ccr/shard_changes";
private ShardChangesAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends SingleShardRequest<Request> {
private long minSeqNo;
private long maxSeqNo;
private ShardId shardId;
private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;
public Request(ShardId shardId) {
super(shardId.getIndexName());
this.shardId = shardId;
}
Request() {
}
public ShardId getShard() {
return shardId;
}
public long getMinSeqNo() {
return minSeqNo;
}
public void setMinSeqNo(long minSeqNo) {
this.minSeqNo = minSeqNo;
}
public long getMaxSeqNo() {
return maxSeqNo;
}
public void setMaxSeqNo(long maxSeqNo) {
this.maxSeqNo = maxSeqNo;
}
public long getMaxTranslogsBytes() {
return maxTranslogsBytes;
}
public void setMaxTranslogsBytes(long maxTranslogsBytes) {
this.maxTranslogsBytes = maxTranslogsBytes;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (minSeqNo < 0) {
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException);
}
if (maxSeqNo < minSeqNo) {
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo ["
+ maxSeqNo + "]", validationException);
}
if (maxTranslogsBytes <= 0) {
validationException = addValidationError("maxTranslogsBytes [" + maxTranslogsBytes + "] must be larger than 0",
validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minSeqNo = in.readVLong();
maxSeqNo = in.readVLong();
shardId = ShardId.readShardId(in);
maxTranslogsBytes = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(minSeqNo);
out.writeVLong(maxSeqNo);
shardId.writeTo(out);
out.writeVLong(maxTranslogsBytes);
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Request request = (Request) o;
return minSeqNo == request.minSeqNo &&
maxSeqNo == request.maxSeqNo &&
Objects.equals(shardId, request.shardId) &&
maxTranslogsBytes == request.maxTranslogsBytes;
}
@Override
public int hashCode() {
return Objects.hash(minSeqNo, maxSeqNo, shardId, maxTranslogsBytes);
}
}
public static final class Response extends ActionResponse {
private Translog.Operation[] operations;
Response() {
}
Response(final Translog.Operation[] operations) {
this.operations = operations;
}
public Translog.Operation[] getOperations() {
return operations;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeArray(Translog.Operation::writeOperation, operations);
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Response response = (Response) o;
return Arrays.equals(operations, response.operations);
}
@Override
public int hashCode() {
return Arrays.hashCode(operations);
}
}
static class RequestBuilder extends SingleShardOperationRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
super(client, action, new Request());
}
}
public static class TransportAction extends TransportSingleShardAction<Request, Response> {
private final IndicesService indicesService;
@Inject
public TransportAction(Settings settings,
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, ThreadPool.Names.GET);
this.indicesService = indicesService;
}
@Override
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
}
@Override
protected boolean resolveIndex(Request request) {
return true;
}
@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
return state.routingTable()
.index(request.concreteIndex())
.shard(request.request().getShard().id())
.activeInitializingShardsRandomIt();
}
@Override
protected Response newResponse() {
return new Response();
}
}
private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
long seenBytes = 0;
long nextExpectedSeqNo = minSeqNo;
final Queue<Translog.Operation> orderedOps = new PriorityQueue<>(Comparator.comparingLong(Translog.Operation::seqNo));
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = indexShard.newTranslogSnapshotBetween(minSeqNo, maxSeqNo)) {
for (Translog.Operation unorderedOp = snapshot.next(); unorderedOp != null; unorderedOp = snapshot.next()) {
if (unorderedOp.seqNo() < minSeqNo || unorderedOp.seqNo() > maxSeqNo) {
continue;
}
orderedOps.add(unorderedOp);
while (orderedOps.peek() != null && orderedOps.peek().seqNo() == nextExpectedSeqNo) {
Translog.Operation orderedOp = orderedOps.poll();
if (seenBytes < byteLimit) {
nextExpectedSeqNo++;
seenBytes += orderedOp.estimateSize();
operations.add(orderedOp);
if (nextExpectedSeqNo > maxSeqNo) {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
}
} else {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
}
}
}
}
if (nextExpectedSeqNo >= maxSeqNo) {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
} else {
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
"] found, tracker checkpoint [" + nextExpectedSeqNo + "]";
throw new IllegalStateException(message);
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
public class ShardFollowNodeTask extends AllocatedPersistentTask {
private final AtomicLong processedGlobalCheckpoint = new AtomicLong();
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
}
void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint.set(processedGlobalCheckpoint);
}
@Override
public Task.Status getStatus() {
return new Status(processedGlobalCheckpoint.get());
}
public static class Status implements Task.Status {
public static final String NAME = "shard-follow-node-task-status";
static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint");
static final ConstructingObjectParser<Status, Void> PARSER =
new ConstructingObjectParser<>(NAME, args -> new Status((Long) args[0]));
static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSED_GLOBAL_CHECKPOINT_FIELD);
}
private final long processedGlobalCheckpoint;
Status(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint = processedGlobalCheckpoint;
}
public Status(StreamInput in) throws IOException {
this.processedGlobalCheckpoint = in.readVLong();
}
public long getProcessedGlobalCheckpoint() {
return processedGlobalCheckpoint;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(processedGlobalCheckpoint);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint);
}
builder.endObject();
return builder;
}
public static Status fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processedGlobalCheckpoint == status.processedGlobalCheckpoint;
}
@Override
public int hashCode() {
return Objects.hash(processedGlobalCheckpoint);
}
public String toString() {
return Strings.toString(this);
}
}
}

View File

@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTaskParams;
import java.io.IOException;
import java.util.Objects;
public class ShardFollowTask implements PersistentTaskParams {
public static final String NAME = "shard_follow";
static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias");
static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index");
static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid");
static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard");
static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index");
static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid");
static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard");
public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size");
public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks");
public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes");
public static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST);
}
private final String leaderClusterAlias;
private final ShardId followShardId;
private final ShardId leaderShardId;
private final long maxChunkSize;
private final int numConcurrentChunks;
private final long processorMaxTranslogBytes;
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize,
int numConcurrentChunks, long processorMaxTranslogBytes) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.maxChunkSize = maxChunkSize;
this.numConcurrentChunks = numConcurrentChunks;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
}
public ShardFollowTask(StreamInput in) throws IOException {
this.leaderClusterAlias = in.readOptionalString();
this.followShardId = ShardId.readShardId(in);
this.leaderShardId = ShardId.readShardId(in);
this.maxChunkSize = in.readVLong();
this.numConcurrentChunks = in.readVInt();
this.processorMaxTranslogBytes = in.readVLong();
}
public String getLeaderClusterAlias() {
return leaderClusterAlias;
}
public ShardId getFollowShardId() {
return followShardId;
}
public ShardId getLeaderShardId() {
return leaderShardId;
}
public long getMaxChunkSize() {
return maxChunkSize;
}
public int getNumConcurrentChunks() {
return numConcurrentChunks;
}
public long getProcessorMaxTranslogBytes() {
return processorMaxTranslogBytes;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(leaderClusterAlias);
followShardId.writeTo(out);
leaderShardId.writeTo(out);
out.writeVLong(maxChunkSize);
out.writeVInt(numConcurrentChunks);
out.writeVLong(processorMaxTranslogBytes);
}
public static ShardFollowTask fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (leaderClusterAlias != null) {
builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias);
}
builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName());
builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID());
builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id());
builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName());
builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID());
builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id());
builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize);
builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks);
builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes);
return builder.endObject();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardFollowTask that = (ShardFollowTask) o;
return Objects.equals(leaderClusterAlias, that.leaderClusterAlias) &&
Objects.equals(followShardId, that.followShardId) &&
Objects.equals(leaderShardId, that.leaderShardId) &&
maxChunkSize == that.maxChunkSize &&
numConcurrentChunks == that.numConcurrentChunks &&
processorMaxTranslogBytes == that.processorMaxTranslogBytes;
}
@Override
public int hashCode() {
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks,
processorMaxTranslogBytes);
}
public String toString() {
return Strings.toString(this);
}
}

View File

@ -0,0 +1,358 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
static final long DEFAULT_BATCH_SIZE = 1024;
static final int PROCESSOR_RETRY_LIMIT = 16;
static final int DEFAULT_CONCURRENT_PROCESSORS = 1;
static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE;
private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500);
private final Client client;
private final ThreadPool threadPool;
public ShardFollowTasksExecutor(Settings settings, Client client, ThreadPool threadPool) {
super(settings, ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME);
this.client = client;
this.threadPool = threadPool;
}
@Override
public void validate(ShardFollowTask params, ClusterState clusterState) {
if (params.getLeaderClusterAlias() == null) {
// We can only validate IndexRoutingTable in local cluster,
// for remote cluster we would need to make a remote call and we cannot do this here.
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getLeaderShardId().getIndex());
if (routingTable.shard(params.getLeaderShardId().id()).primaryShard().started() == false) {
throw new IllegalArgumentException("Not all copies of leader shard are started");
}
}
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
if (routingTable.shard(params.getFollowShardId().id()).primaryShard().started() == false) {
throw new IllegalArgumentException("Not all copies of follow shard are started");
}
}
@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
Map<String, String> headers) {
return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers);
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) {
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
Client leaderClient = params.getLeaderClusterAlias() != null ?
this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client;
logger.info("Starting shard following [{}]", params);
fetchGlobalCheckpoint(client, params.getFollowShardId(),
followGlobalCheckPoint -> prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed);
}
void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control
return;
}
final ShardId leaderShard = params.getLeaderShardId();
final ShardId followerShard = params.getFollowShardId();
fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> {
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
retry(leaderClient, task, params, followGlobalCheckPoint);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(leaderClient, task, params, leaderGlobalCheckPoint);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(client, leaderClient, ccrExecutor, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
}, task::markAsFailed);
}
private void retry(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
}
@Override
protected void doRun() throws Exception {
prepare(leaderClient, task, params, followGlobalCheckPoint);
}
});
}
private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId))
.filter(shardStats -> shardStats.getShardRouting().primary())
.findAny();
if (filteredShardStats.isPresent()) {
// Treat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1.
final long globalCheckPoint = Math.max(0, filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint());
handler.accept(globalCheckPoint);
} else {
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
}
}, errorHandler));
}
static class ChunksCoordinator {
private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class);
private final Client followerClient;
private final Client leaderClient;
private final Executor ccrExecutor;
private final long batchSize;
private final int concurrentProcessors;
private final long processorMaxTranslogBytes;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
private final CountDown countDown;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();
ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, long batchSize, int concurrentProcessors,
long processorMaxTranslogBytes, ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
this.followerClient = followerClient;
this.leaderClient = leaderClient;
this.ccrExecutor = ccrExecutor;
this.batchSize = batchSize;
this.concurrentProcessors = concurrentProcessors;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
this.countDown = new CountDown(concurrentProcessors);
}
void createChucks(long from, long to) {
LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to);
for (long i = from; i < to; i += batchSize) {
long v2 = i + batchSize < to ? i + batchSize : to;
chunks.add(new long[]{i == from ? i : i + 1, v2});
}
}
void start() {
LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors",
leaderShard, chunks.size(), concurrentProcessors);
for (int i = 0; i < concurrentProcessors; i++) {
ccrExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e);
postProcessChuck(e);
}
@Override
protected void doRun() throws Exception {
processNextChunk();
}
});
}
}
void processNextChunk() {
long[] chunk = chunks.poll();
if (chunk == null) {
postProcessChuck(null);
return;
}
LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
Consumer<Exception> processorHandler = e -> {
if (e == null) {
LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
processNextChunk();
} else {
LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]",
leaderShard, chunk[0], chunk[1]), e);
postProcessChuck(e);
}
};
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, leaderShard,
followerShard, processorHandler);
processor.start(chunk[0], chunk[1], processorMaxTranslogBytes);
}
void postProcessChuck(Exception e) {
if (failureHolder.compareAndSet(null, e) == false) {
Exception firstFailure = failureHolder.get();
firstFailure.addSuppressed(e);
}
if (countDown.countDown()) {
handler.accept(failureHolder.get());
}
}
Queue<long[]> getChunks() {
return chunks;
}
}
static class ChunkProcessor {
private final Client leaderClient;
private final Client followerClient;
private final Queue<long[]> chunks;
private final Executor ccrExecutor;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
final AtomicInteger retryCounter = new AtomicInteger(0);
ChunkProcessor(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
this.leaderClient = leaderClient;
this.followerClient = followerClient;
this.chunks = chunks;
this.ccrExecutor = ccrExecutor;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
}
void start(final long from, final long to, final long maxTranslogsBytes) {
ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard);
request.setMinSeqNo(from);
request.setMaxSeqNo(to);
request.setMaxTranslogsBytes(maxTranslogsBytes);
leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener<ShardChangesAction.Response>() {
@Override
public void onResponse(ShardChangesAction.Response response) {
handleResponse(to, response);
}
@Override
public void onFailure(Exception e) {
assert e != null;
if (shouldRetry(e)) {
if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) {
start(from, to, maxTranslogsBytes);
} else {
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() +
"] times, aborting...", e));
}
} else {
handler.accept(e);
}
}
});
}
void handleResponse(final long to, final ShardChangesAction.Response response) {
if (response.getOperations().length != 0) {
Translog.Operation lastOp = response.getOperations()[response.getOperations().length - 1];
boolean maxByteLimitReached = lastOp.seqNo() < to;
if (maxByteLimitReached) {
// add a new entry to the queue for the operations that couldn't be fetched in the current shard changes api call:
chunks.add(new long[]{lastOp.seqNo() + 1, to});
}
}
ccrExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;
handler.accept(e);
}
@Override
protected void doRun() throws Exception {
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
@Override
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);
}
@Override
public void onFailure(final Exception e) {
// No retry mechanism here, because if a failure is being redirected to this place it is considered
// non recoverable.
assert e != null;
handler.accept(e);
}
});
}
});
}
boolean shouldRetry(Exception e) {
// TODO: What other exceptions should be retried?
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e);
}
}
}

View File

@ -0,0 +1,163 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class UnfollowIndexAction extends Action<UnfollowIndexAction.Request, UnfollowIndexAction.Response,
UnfollowIndexAction.RequestBuilder> {
public static final UnfollowIndexAction INSTANCE = new UnfollowIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/unfollow_index";
private UnfollowIndexAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
private String followIndex;
public String getFollowIndex() {
return followIndex;
}
public void setFollowIndex(String followIndex) {
this.followIndex = followIndex;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
followIndex = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(followIndex);
}
}
public static class Response extends ActionResponse {
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
PersistentTasksService persistentTasksService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.followIndex);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
persistentTasksService.cancelPersistentTask(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
responses.set(shardId, task);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}
if (error == null) {
// include task ids?
listener.onResponse(new Response());
} else {
// TODO: cancel all started tasks
listener.onFailure(error);
}
}
}
});
}
}, listener::onFailure));
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.bulk;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class BulkShardOperationsAction
extends Action<BulkShardOperationsRequest, BulkShardOperationsResponse, BulkShardOperationsRequestBuilder> {
public static final BulkShardOperationsAction INSTANCE = new BulkShardOperationsAction();
public static final String NAME = "indices:data/write/bulk_shard_operations[s]";
private BulkShardOperationsAction() {
super(NAME);
}
@Override
public BulkShardOperationsRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new BulkShardOperationsRequestBuilder(client);
}
@Override
public BulkShardOperationsResponse newResponse() {
return new BulkShardOperationsResponse();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.bulk;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> {
private Translog.Operation[] operations;
public BulkShardOperationsRequest() {
}
public BulkShardOperationsRequest(final ShardId shardId, final Translog.Operation[] operations) {
super(shardId);
setRefreshPolicy(RefreshPolicy.NONE);
this.operations = operations;
}
public Translog.Operation[] getOperations() {
return operations;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeArray(Translog.Operation::writeOperation, operations);
}
@Override
public String toString() {
return "BulkShardOperationsRequest{" +
"operations=" + operations.length+
", shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", waitForActiveShards=" + waitForActiveShards +
'}';
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.bulk;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
public class BulkShardOperationsRequestBuilder
extends ActionRequestBuilder<BulkShardOperationsRequest, BulkShardOperationsResponse, BulkShardOperationsRequestBuilder> {
public BulkShardOperationsRequestBuilder(final ElasticsearchClient client) {
super(client, BulkShardOperationsAction.INSTANCE, new BulkShardOperationsRequest());
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.bulk;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
public final class BulkShardOperationsResponse extends ReplicationResponse implements WriteResponse {
@Override
public void setForcedRefresh(final boolean forcedRefresh) {
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.bulk;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
public class TransportBulkShardOperationsAction
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
@Inject
public TransportBulkShardOperationsAction(
final Settings settings,
final TransportService transportService,
final ClusterService clusterService,
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
BulkShardOperationsAction.NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
BulkShardOperationsRequest::new,
BulkShardOperationsRequest::new,
ThreadPool.Names.WRITE);
}
@Override
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY);
return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger);
}
@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}
private Translog.Location applyTranslogOperations(
final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
Translog.Location location = null;
for (final Translog.Operation operation : request.getOperations()) {
final Engine.Result result = shard.applyTranslogOperation(operation, origin, m -> {
// TODO: Figure out how to deal best with dynamic mapping updates from the leader side:
throw new MapperException("dynamic mapping updates are not allowed in follow shards [" + operation + "]");
});
assert result.getSeqNo() == operation.seqNo();
assert result.hasFailure() == false;
location = locationToSync(location, result.getTranslogLocation());
}
assert request.getOperations().length == 0 || location != null;
return location;
}
@Override
protected BulkShardOperationsResponse newResponseInstance() {
return new BulkShardOperationsResponse();
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.xpack.ccr.CcrSettings;
import java.io.IOException;
/**
* An engine implementation for following shards.
*/
public final class FollowingEngine extends InternalEngine {
/**
* Construct a new following engine with the specified engine configuration.
*
* @param engineConfig the engine configuration
*/
FollowingEngine(final EngineConfig engineConfig) {
super(validateEngineConfig(engineConfig));
}
private static EngineConfig validateEngineConfig(final EngineConfig engineConfig) {
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(engineConfig.getIndexSettings().getSettings()) == false) {
throw new IllegalArgumentException("a following engine can not be constructed for a non-following index");
}
return engineConfig;
}
private void preFlight(final Operation operation) {
/*
* We assert here so that this goes uncaught in unit tests and fails nodes in standalone tests (we want a harsh failure so that we
* do not have a situation where a shard fails and is recovered elsewhere and a test subsequently passes). We throw an exception so
* that we also prevent issues in production code.
*/
assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
if (operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalStateException("a following engine does not accept operations without an assigned sequence number");
}
}
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
return planIndexingAsNonPrimary(index);
}
@Override
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
preFlight(delete);
return planDeletionAsNonPrimary(delete);
}
@Override
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
// sequence number should be set when operation origin is primary
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on a following index must have an assigned sequence number";
return true;
}
@Override
protected boolean assertNonPrimaryOrigin(final Operation operation) {
return true;
}
@Override
protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
: "version [" + index.version() + "], type [" + index.versionType() + "]";
return true;
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
/**
* An engine factory for following engines.
*/
public final class FollowingEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(final EngineConfig config) {
return new FollowingEngine(config);
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Response;
// TODO: change to confirm with API design
public class RestFollowExistingIndexAction extends BaseRestHandler {
public RestFollowExistingIndexAction(Settings settings, RestController controller) {
super(settings);
// TODO: figure out why: '/{follow_index}/_xpack/ccr/_follow' path clashes with create index api.
controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_follow", this);
}
@Override
public String getName() {
return "xpack_ccr_follow_index_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request();
request.setLeaderIndex(restRequest.param("leader_index"));
request.setFollowIndex(restRequest.param("follow_index"));
if (restRequest.hasParam(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())) {
request.setBatchSize(Long.valueOf(restRequest.param(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())));
}
if (restRequest.hasParam(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())) {
request.setConcurrentProcessors(Integer.valueOf(restRequest.param(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())));
}
if (restRequest.hasParam(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())) {
long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName()));
request.setProcessorMaxTranslogBytes(value);
}
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.endObject());
}
});
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Request;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Response;
// TODO: change to confirm with API design
public class RestUnfollowIndexAction extends BaseRestHandler {
public RestUnfollowIndexAction(Settings settings, RestController controller) {
super(settings);
// TODO: figure out why: '/{follow_index}/_xpack/ccr/_unfollow' path clashes with create index api.
controller.registerHandler(RestRequest.Method.POST, "/_xpack/ccr/{follow_index}/_unfollow", this);
}
@Override
public String getName() {
return "xpack_ccr_unfollow_index_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request();
request.setFollowIndex(restRequest.param("follow_index"));
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) {
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.endObject());
}
});
}
}

View File

@ -0,0 +1,50 @@
grant {
// needed because of problems in unbound LDAP library
permission java.util.PropertyPermission "*", "read,write";
// required to configure the custom mailcap for watcher
permission java.lang.RuntimePermission "setFactory";
// needed when sending emails for javax.activation
// otherwise a classnotfound exception is thrown due to trying
// to load the class with the application class loader
permission java.lang.RuntimePermission "setContextClassLoader";
permission java.lang.RuntimePermission "getClassLoader";
// TODO: remove use of this jar as soon as possible!!!!
permission java.lang.RuntimePermission "accessClassInPackage.com.sun.activation.registries";
// bouncy castle
permission java.security.SecurityPermission "putProviderProperty.BC";
// needed for x-pack security extension
permission java.security.SecurityPermission "createPolicy.JavaPolicy";
permission java.security.SecurityPermission "getPolicy";
permission java.security.SecurityPermission "setPolicy";
// needed for multiple server implementations used in tests
permission java.net.SocketPermission "*", "accept,connect";
// needed for Windows named pipes in machine learning
permission java.io.FilePermission "\\\\.\\pipe\\*", "read,write";
};
grant codeBase "${codebase.netty-common}" {
// for reading the system-wide configuration for the backlog of established sockets
permission java.io.FilePermission "/proc/sys/net/core/somaxconn", "read";
};
grant codeBase "${codebase.netty-transport}" {
// Netty NioEventLoop wants to change this, because of https://bugs.openjdk.java.net/browse/JDK-6427854
// the bug says it only happened rarely, and that its fixed, but apparently it still happens rarely!
permission java.util.PropertyPermission "sun.nio.ch.bugLevel", "write";
};
grant codeBase "${codebase.elasticsearch-rest-client}" {
// rest client uses system properties which gets the default proxy
permission java.net.NetPermission "getProxySelector";
};
grant codeBase "${codebase.httpasyncclient}" {
// rest client uses system properties which gets the default proxy
permission java.net.NetPermission "getProxySelector";
};

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import java.io.IOException;
import java.util.Optional;
import static org.hamcrest.Matchers.instanceOf;
public class CcrTests extends ESTestCase {
public void testGetEngineFactory() throws IOException {
final Boolean[] values = new Boolean[] { true, false, null };
for (final Boolean value : values) {
final String indexName = "following-" + value;
final Index index = new Index(indexName, UUIDs.randomBase64UUID());
final Settings.Builder builder = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID());
if (value != null) {
builder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), value);
}
final IndexMetaData indexMetaData = new IndexMetaData.Builder(index.getName())
.settings(builder.build())
.numberOfShards(1)
.numberOfReplicas(0)
.build();
final Ccr ccr = new Ccr(Settings.EMPTY);
final Optional<EngineFactory> engineFactory =
ccr.getEngineFactory(new IndexSettings(indexMetaData, Settings.EMPTY));
if (value != null && value) {
assertTrue(engineFactory.isPresent());
assertThat(engineFactory.get(), instanceOf(FollowingEngineFactory.class));
} else {
assertFalse(engineFactory.isPresent());
}
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import java.nio.file.Path;
public class LocalStateCcr extends LocalStateCompositeXPackPlugin {
public LocalStateCcr(final Settings settings, final Path configPath) throws Exception {
super(settings, configPath);
LocalStateCcr thisVar = this;
plugins.add(new Ccr(settings){
@Override
protected XPackLicenseState getLicenseState() {
return thisVar.getLicenseState();
}
});
}
}

View File

@ -0,0 +1,327 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
/*
/*
* ELASTICSEARCH CONFIDENTIAL
* __________________
*
* [2017] Elasticsearch Incorporated. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of Elasticsearch Incorporated and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to Elasticsearch Incorporated
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from Elasticsearch Incorporated.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackSettings;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
public class ShardChangesIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder newSettings = Settings.builder();
newSettings.put(super.nodeSettings(nodeOrdinal));
newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
return newSettings.build();
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Arrays.asList(TestSeedPlugin.class, TestZenDiscovery.TestPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class);
}
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
// this emulates what the CCR persistent task will do for pulling
public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
client().admin().indices().prepareCreate("index")
.setSettings(Settings.builder().put("index.number_of_shards", 1))
.get();
client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(2L));
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId());
request.setMinSeqNo(0L);
request.setMaxSeqNo(globalCheckPoint);
ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().length, equalTo(3));
Translog.Index operation = (Translog.Index) response.getOperations()[0];
assertThat(operation.seqNo(), equalTo(0L));
assertThat(operation.id(), equalTo("1"));
operation = (Translog.Index) response.getOperations()[1];
assertThat(operation.seqNo(), equalTo(1L));
assertThat(operation.id(), equalTo("2"));
operation = (Translog.Index) response.getOperations()[2];
assertThat(operation.seqNo(), equalTo(2L));
assertThat(operation.id(), equalTo("3"));
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "4").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "5").setSource("{}", XContentType.JSON).get();
shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(5L));
request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId());
request.setMinSeqNo(3L);
request.setMaxSeqNo(globalCheckPoint);
response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().length, equalTo(3));
operation = (Translog.Index) response.getOperations()[0];
assertThat(operation.seqNo(), equalTo(3L));
assertThat(operation.id(), equalTo("3"));
operation = (Translog.Index) response.getOperations()[1];
assertThat(operation.seqNo(), equalTo(4L));
assertThat(operation.id(), equalTo("4"));
operation = (Translog.Index) response.getOperations()[2];
assertThat(operation.seqNo(), equalTo(5L));
assertThat(operation.id(), equalTo("5"));
}
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.emptyMap());
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
final String followerIndexSettings =
getIndexSettings(numberOfPrimaryShards, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
ensureGreen("index1", "index2");
final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request();
followRequest.setLeaderIndex("index1");
followRequest.setFollowIndex("index2");
client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get();
final int firstBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : firstBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
}
}
assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard));
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
final int secondBatchNumDocs = randomIntBetween(2, 64);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final Map<ShardId, Long> secondBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] secondBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : secondBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
}
}
assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard));
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
unfollowRequest.setFollowIndex("index2");
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
});
}
public void testFollowNonExistentIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test-leader").get());
assertAcked(client().admin().indices().prepareCreate("test-follower").get());
final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request();
// Leader index does not exist.
followRequest.setLeaderIndex("non-existent-leader");
followRequest.setFollowIndex("test-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet());
// Follower index does not exist.
followRequest.setLeaderIndex("test-leader");
followRequest.setFollowIndex("non-existent-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet());
// Both indices do not exist.
followRequest.setLeaderIndex("non-existent-leader");
followRequest.setFollowIndex("non-existent-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet());
}
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(ShardFollowTask.NAME + "[c]");
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).actionGet();
assertThat(listTasksResponse.getNodeFailures().size(), equalTo(0));
assertThat(listTasksResponse.getTaskFailures().size(), equalTo(0));
List<TaskInfo> taskInfos = listTasksResponse.getTasks();
assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
final ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
TaskInfo taskInfo = null;
String expectedId = "id=" + task.getId();
for (TaskInfo info : taskInfos) {
if (expectedId.equals(info.getDescription())) {
taskInfo = info;
break;
}
}
assertThat(taskInfo, notNullValue());
ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus();
assertThat(status, notNullValue());
assertThat(
status.getProcessedGlobalCheckpoint(),
equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId())));
}
};
}
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
return () -> {
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
assertTrue(getResponse.isExists());
assertTrue((getResponse.getSource().containsKey("f")));
assertThat(getResponse.getSource().get("f"), equalTo(value));
};
}
private String getIndexSettings(final int numberOfPrimaryShards, final Map<String, String> additionalIndexSettings) throws IOException {
final String settings;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("settings");
{
builder.field("index.number_of_shards", numberOfPrimaryShards);
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
}
}
builder.endObject();
builder.startObject("mappings");
{
builder.startObject("doc");
{
builder.startObject("properties");
{
builder.startObject("f");
{
builder.field("type", "integer");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
settings = BytesReference.bytes(builder).utf8ToString();
}
return settings;
}
}

View File

@ -0,0 +1,367 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class ChunksCoordinatorTests extends ESTestCase {
public void testCreateChunks() {
Client client = mock(Client.class);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
ChunksCoordinator coordinator =
new ChunksCoordinator(client, client, ccrExecutor, 1024, 1, Long.MAX_VALUE, leaderShardId, followShardId, e -> {});
coordinator.createChucks(0, 1024);
List<long[]> result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(1));
assertThat(result.get(0)[0], equalTo(0L));
assertThat(result.get(0)[1], equalTo(1024L));
coordinator.getChunks().clear();
coordinator.createChucks(0, 2048);
result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(2));
assertThat(result.get(0)[0], equalTo(0L));
assertThat(result.get(0)[1], equalTo(1024L));
assertThat(result.get(1)[0], equalTo(1025L));
assertThat(result.get(1)[1], equalTo(2048L));
coordinator.getChunks().clear();
coordinator.createChucks(0, 4096);
result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(4));
assertThat(result.get(0)[0], equalTo(0L));
assertThat(result.get(0)[1], equalTo(1024L));
assertThat(result.get(1)[0], equalTo(1025L));
assertThat(result.get(1)[1], equalTo(2048L));
assertThat(result.get(2)[0], equalTo(2049L));
assertThat(result.get(2)[1], equalTo(3072L));
assertThat(result.get(3)[0], equalTo(3073L));
assertThat(result.get(3)[1], equalTo(4096L));
coordinator.getChunks().clear();
coordinator.createChucks(4096, 8196);
result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(5));
assertThat(result.get(0)[0], equalTo(4096L));
assertThat(result.get(0)[1], equalTo(5120L));
assertThat(result.get(1)[0], equalTo(5121L));
assertThat(result.get(1)[1], equalTo(6144L));
assertThat(result.get(2)[0], equalTo(6145L));
assertThat(result.get(2)[1], equalTo(7168L));
assertThat(result.get(3)[0], equalTo(7169L));
assertThat(result.get(3)[1], equalTo(8192L));
assertThat(result.get(4)[0], equalTo(8193L));
assertThat(result.get(4)[1], equalTo(8196L));
}
public void testCoordinator() throws Exception {
Client client = mock(Client.class);
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
Consumer<Exception> handler = e -> assertThat(e, nullValue());
int concurrentProcessors = randomIntBetween(1, 4);
int batchSize = randomIntBetween(1, 1000);
ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, batchSize, concurrentProcessors,
Long.MAX_VALUE, leaderShardId, followShardId, handler);
int numberOfOps = randomIntBetween(batchSize, batchSize * 20);
long from = randomInt(1000);
long to = from + numberOfOps;
coordinator.createChucks(from, to);
int expectedNumberOfChunks = numberOfOps / batchSize;
if (numberOfOps % batchSize > 0) {
expectedNumberOfChunks++;
}
assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks));
coordinator.start();
assertThat(coordinator.getChunks().size(), equalTo(0));
verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE),
any(ShardChangesAction.Request.class), any());
verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE),
any(BulkShardOperationsRequest.class), any());
}
public void testCoordinator_failure() throws Exception {
Exception expectedException = new RuntimeException("throw me");
Client client = mock(Client.class);
boolean shardChangesActionApiCallFailed;
if (randomBoolean()) {
shardChangesActionApiCallFailed = true;
doThrow(expectedException).when(client).execute(same(ShardChangesAction.INSTANCE),
any(ShardChangesAction.Request.class), any());
} else {
shardChangesActionApiCallFailed = false;
mockShardChangesApiCall(client);
doThrow(expectedException).when(client).execute(same(BulkShardOperationsAction.INSTANCE),
any(BulkShardOperationsRequest.class), any());
}
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
Consumer<Exception> handler = e -> {
assertThat(e, notNullValue());
assertThat(e, sameInstance(expectedException));
};
ChunksCoordinator coordinator =
new ChunksCoordinator(client, client, ccrExecutor, 10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler);
coordinator.createChucks(0, 20);
assertThat(coordinator.getChunks().size(), equalTo(2));
coordinator.start();
assertThat(coordinator.getChunks().size(), equalTo(1));
verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class),
any());
verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE),
any(BulkShardOperationsRequest.class), any());
}
public void testCoordinator_concurrent() throws Exception {
Client client = mock(Client.class);
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = command -> new Thread(command).start();
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
AtomicBoolean calledOnceChecker = new AtomicBoolean(false);
AtomicReference<Exception> failureHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Consumer<Exception> handler = e -> {
if (failureHolder.compareAndSet(null, e) == false) {
// This handler should only be called once, irregardless of the number of concurrent processors
calledOnceChecker.set(true);
}
latch.countDown();
};
ChunksCoordinator coordinator =
new ChunksCoordinator(client, client, ccrExecutor, 1000, 4, Long.MAX_VALUE, leaderShardId, followShardId, handler);
coordinator.createChucks(0, 1000000);
assertThat(coordinator.getChunks().size(), equalTo(1000));
coordinator.start();
latch.await();
assertThat(coordinator.getChunks().size(), equalTo(0));
verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any());
verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any());
assertThat(calledOnceChecker.get(), is(false));
}
public void testChunkProcessor() {
Client client = mock(Client.class);
Queue<long[]> chunks = new LinkedList<>();
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
}
public void testChunkProcessorRetry() {
Client client = mock(Client.class);
Queue<long[]> chunks = new LinkedList<>();
mockBulkShardOperationsApiCall(client);
int testRetryLimit = randomIntBetween(1, ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT - 1);
mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1));
}
public void testChunkProcessorRetryTooManyTimes() {
Client client = mock(Client.class);
Queue<long[]> chunks = new LinkedList<>();
mockBulkShardOperationsApiCall(client);
int testRetryLimit = ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT + 1;
mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting..."));
assertThat(exception[0].getCause().getMessage(), equalTo("connection exception"));
assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit));
}
public void testChunkProcessorNoneRetryableError() {
Client client = mock(Client.class);
Queue<long[]> chunks = new LinkedList<>();
mockBulkShardOperationsApiCall(client);
mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
assertThat(exception[0].getMessage(), equalTo("unexpected"));
assertThat(chunkProcessor.retryCounter.get(), equalTo(0));
}
public void testChunkProcessorExceedMaxTranslogsBytes() {
long from = 0;
long to = 20;
long actualTo = 10;
Client client = mock(Client.class);
Queue<long[]> chunks = new LinkedList<>();
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 3;
@SuppressWarnings("unchecked")
ActionListener<ShardChangesAction.Response> listener = (ActionListener) args[2];
List<Translog.Operation> operations = new ArrayList<>();
for (int i = 0; i <= actualTo; i++) {
operations.add(new Translog.NoOp(i, 1, "test"));
}
listener.onResponse(new ShardChangesAction.Response(operations.toArray(new Translog.Operation[0])));
return null;
}).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any());
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(from, to, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
assertThat(chunks.size(), equalTo(1));
assertThat(chunks.peek()[0], equalTo(11L));
assertThat(chunks.peek()[1], equalTo(20L));
}
private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) {
int[] retryCounter = new int[1];
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 3;
ShardChangesAction.Request request = (ShardChangesAction.Request) args[1];
@SuppressWarnings("unchecked")
ActionListener<ShardChangesAction.Response> listener = (ActionListener) args[2];
if (retryCounter[0]++ <= testRetryLimit) {
listener.onFailure(e);
} else {
long delta = request.getMaxSeqNo() - request.getMinSeqNo();
Translog.Operation[] operations = new Translog.Operation[(int) delta];
for (int i = 0; i < operations.length; i++) {
operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test");
}
ShardChangesAction.Response response = new ShardChangesAction.Response(operations);
listener.onResponse(response);
}
return null;
}).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any());
}
private void mockShardChangesApiCall(Client client) {
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 3;
ShardChangesAction.Request request = (ShardChangesAction.Request) args[1];
@SuppressWarnings("unchecked")
ActionListener<ShardChangesAction.Response> listener = (ActionListener) args[2];
List<Translog.Operation> operations = new ArrayList<>();
for (long i = request.getMinSeqNo(); i <= request.getMaxSeqNo(); i++) {
operations.add(new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"));
}
ShardChangesAction.Response response = new ShardChangesAction.Response(operations.toArray(new Translog.Operation[0]));
listener.onResponse(response);
return null;
}).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any());
}
private void mockBulkShardOperationsApiCall(Client client) {
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 3;
@SuppressWarnings("unchecked")
ActionListener<BulkShardOperationsResponse> listener = (ActionListener) args[2];
listener.onResponse(new BulkShardOperationsResponse());
return null;
}).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any());
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.mockito.Mockito;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class ShardChangesActionTests extends ESSingleNodeTestCase {
public void testGetOperationsBetween() throws Exception {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.translog.generation_threshold_size", new ByteSizeValue(randomIntBetween(8, 64), ByteSizeUnit.KB))
.build();
final IndexService indexService = createIndex("index", settings);
final int numWrites = randomIntBetween(2, 8192);
for (int i = 0; i < numWrites; i++) {
client().prepareIndex("index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
}
// A number of times, get operations within a range that exists:
int iters = randomIntBetween(8, 32);
IndexShard indexShard = indexService.getShard(0);
for (int iter = 0; iter < iters; iter++) {
int min = randomIntBetween(0, numWrites - 1);
int max = randomIntBetween(min, numWrites - 1);
final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE);
/*
* We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple
* generations) so the best we can assert is that we see the expected operations.
*/
final Set<Long> seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toSet());
final Set<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet());
assertThat(seenSeqNos, equalTo(expectedSeqNos));
}
// get operations for a range no operations exists:
Exception e = expectThrows(IllegalStateException.class,
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" +
(numWrites + 1) +"] found, tracker checkpoint ["));
// get operations for a range some operations do not exist:
e = expectThrows(IllegalStateException.class,
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" +
(numWrites + 10) +"] found, tracker checkpoint ["));
}
public void testGetOperationsBetweenWhenShardNotStarted() throws Exception {
IndexShard indexShard = Mockito.mock(IndexShard.class);
ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING);
Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting);
expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE));
}
public void testGetOperationsBetweenExceedByteLimit() throws Exception {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
final IndexService indexService = createIndex("index", settings);
final long numWrites = 32;
for (int i = 0; i < numWrites; i++) {
client().prepareIndex("index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
}
final IndexShard indexShard = indexService.getShard(0);
final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256);
assertThat(r.getOperations().length, equalTo(12));
assertThat(r.getOperations()[0].seqNo(), equalTo(0L));
assertThat(r.getOperations()[1].seqNo(), equalTo(1L));
assertThat(r.getOperations()[2].seqNo(), equalTo(2L));
assertThat(r.getOperations()[3].seqNo(), equalTo(3L));
assertThat(r.getOperations()[4].seqNo(), equalTo(4L));
assertThat(r.getOperations()[5].seqNo(), equalTo(5L));
assertThat(r.getOperations()[6].seqNo(), equalTo(6L));
assertThat(r.getOperations()[7].seqNo(), equalTo(7L));
assertThat(r.getOperations()[8].seqNo(), equalTo(8L));
assertThat(r.getOperations()[9].seqNo(), equalTo(9L));
assertThat(r.getOperations()[10].seqNo(), equalTo(10L));
assertThat(r.getOperations()[11].seqNo(), equalTo(11L));
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractStreamableTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.nullValue;
public class ShardChangesRequestTests extends AbstractStreamableTestCase<ShardChangesAction.Request> {
@Override
protected ShardChangesAction.Request createTestInstance() {
ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0));
request.setMaxSeqNo(randomNonNegativeLong());
request.setMinSeqNo(randomNonNegativeLong());
return request;
}
@Override
protected ShardChangesAction.Request createBlankInstance() {
return new ShardChangesAction.Request();
}
public void testValidate() {
ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0));
request.setMinSeqNo(-1);
assertThat(request.validate().getMessage(), containsString("minSeqNo [-1] cannot be lower than 0"));
request.setMinSeqNo(4);
assertThat(request.validate().getMessage(), containsString("minSeqNo [4] cannot be larger than maxSeqNo [0]"));
request.setMaxSeqNo(8);
assertThat(request.validate(), nullValue());
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardChangesAction.Response> {
@Override
protected ShardChangesAction.Response createTestInstance() {
final int numOps = randomInt(8);
final Translog.Operation[] operations = new Translog.Operation[numOps];
for (int i = 0; i < numOps; i++) {
operations[i] = new Translog.NoOp(i, 0, "test");
}
return new ShardChangesAction.Response(operations);
}
@Override
protected ShardChangesAction.Response createBlankInstance() {
return new ShardChangesAction.Response();
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<ShardFollowNodeTask.Status> {
@Override
protected ShardFollowNodeTask.Status doParseInstance(XContentParser parser) throws IOException {
return ShardFollowNodeTask.Status.fromXContent(parser);
}
@Override
protected ShardFollowNodeTask.Status createTestInstance() {
return new ShardFollowNodeTask.Status(randomNonNegativeLong());
}
@Override
protected Writeable.Reader<ShardFollowNodeTask.Status> instanceReader() {
return ShardFollowNodeTask.Status::new;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollowTask> {
@Override
protected ShardFollowTask doParseInstance(XContentParser parser) throws IOException {
return ShardFollowTask.fromXContent(parser);
}
@Override
protected ShardFollowTask createTestInstance() {
return new ShardFollowTask(
randomAlphaOfLength(4),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE),
randomIntBetween(1, Integer.MAX_VALUE));
}
@Override
protected Writeable.Reader<ShardFollowTask> instanceReader() {
return ShardFollowTask::new;
}
}

View File

@ -0,0 +1,310 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.TranslogHandler;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
public class FollowingEngineTests extends ESTestCase {
private ThreadPool threadPool;
private Index index;
private ShardId shardId;
private AtomicLong primaryTerm = new AtomicLong();
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("following-engine-tests");
index = new Index("index", "uuid");
shardId = new ShardId(index, 0);
primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE));
}
public void tearDown() throws Exception {
terminate(threadPool);
super.tearDown();
}
public void testFollowingEngineRejectsNonFollowingIndex() throws IOException {
final Settings.Builder builder =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT);
if (randomBoolean()) {
builder.put("index.xpack.ccr.following_index", false);
}
final Settings settings = builder.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new FollowingEngine(engineConfig));
assertThat(e, hasToString(containsString("a following engine can not be constructed for a non-following index")));
}
}
public void testIndexSeqNoIsMaintained() throws IOException {
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
runIndexTest(
seqNo,
Engine.Operation.Origin.PRIMARY,
(followingEngine, index) -> {
final Engine.IndexResult result = followingEngine.index(index);
assertThat(result.getSeqNo(), equalTo(seqNo));
});
}
/*
* A following engine (whether or not it is an engine for a primary or replica shard) needs to maintain ordering semantics as the
* operations presented to it can arrive out of order (while a leader engine that is for a primary shard dictates the order). This test
* ensures that these semantics are maintained.
*/
public void testOutOfOrderDocuments() throws IOException {
final Settings settings =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
final VersionType versionType =
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE);
final List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, versionType, false, 2, 2, 20);
EngineTestCase.assertOpsOnReplica(ops, followingEngine, true, logger);
}
}
}
public void runIndexTest(
final long seqNo,
final Engine.Operation.Origin origin,
final CheckedBiConsumer<FollowingEngine, Engine.Index, IOException> consumer) throws IOException {
final Settings settings =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
final String id = "id";
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
final String type = "type";
final Field versionField = new NumericDocValuesField("_version", 0);
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
final ParseContext.Document document = new ParseContext.Document();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[]{1});
final ParsedDocument parsedDocument = new ParsedDocument(
versionField,
seqID,
id,
type,
"routing",
Collections.singletonList(document),
source,
XContentType.JSON,
null);
final long version;
final long autoGeneratedIdTimestamp;
if (randomBoolean()) {
version = 1;
autoGeneratedIdTimestamp = System.currentTimeMillis();
} else {
version = randomNonNegativeLong();
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
final Engine.Index index = new Engine.Index(
new Term("_id", parsedDocument.id()),
parsedDocument,
seqNo,
primaryTerm.get(),
version,
VersionType.EXTERNAL,
origin,
System.currentTimeMillis(),
autoGeneratedIdTimestamp,
randomBoolean());
consumer.accept(followingEngine, index);
}
}
}
public void testDeleteSeqNoIsMaintained() throws IOException {
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
runDeleteTest(
seqNo,
Engine.Operation.Origin.PRIMARY,
(followingEngine, delete) -> {
final Engine.DeleteResult result = followingEngine.delete(delete);
assertThat(result.getSeqNo(), equalTo(seqNo));
});
}
public void runDeleteTest(
final long seqNo,
final Engine.Operation.Origin origin,
final CheckedBiConsumer<FollowingEngine, Engine.Delete, IOException> consumer) throws IOException {
final Settings settings =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
final String id = "id";
final Engine.Delete delete = new Engine.Delete(
"type",
id,
new Term("_id", id),
seqNo,
primaryTerm.get(),
randomNonNegativeLong(),
VersionType.EXTERNAL,
origin,
System.currentTimeMillis());
consumer.accept(followingEngine, delete);
}
}
}
private EngineConfig engineConfig(
final ShardId shardId,
final IndexSettings indexSettings,
final ThreadPool threadPool,
final Store store,
final Logger logger,
final NamedXContentRegistry xContentRegistry) throws IOException {
final IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
final Path translogPath = createTempDir("translog");
final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
return new EngineConfig(
shardId,
"allocation-id",
threadPool,
indexSettings,
null,
store,
newMergePolicy(),
indexWriterConfig.getAnalyzer(),
indexWriterConfig.getSimilarity(),
new CodecService(null, logger),
new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, Exception e) {
}
},
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
translogConfig,
TimeValue.timeValueMinutes(5),
Collections.emptyList(),
Collections.emptyList(),
null,
new TranslogHandler(
xContentRegistry, IndexSettingsModule.newIndexSettings(shardId.getIndexName(), indexSettings.getSettings())),
new NoneCircuitBreakerService(),
() -> SequenceNumbers.NO_OPS_PERFORMED,
() -> primaryTerm.get(),
EngineTestCase::createTombstoneDoc
);
}
private static Store createStore(
final ShardId shardId, final IndexSettings indexSettings, final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return directory;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
}
private FollowingEngine createEngine(Store store, EngineConfig config) throws IOException {
store.createEmpty();
final String translogUuid = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L);
store.associateIndexWithNewTranslog(translogUuid);
FollowingEngine followingEngine = new FollowingEngine(config);
followingEngine.recoverFromTranslog();
return followingEngine;
}
}