[CCR] Add create and follow api (#30602)

Also renamed FollowExisting* internal names to just Follow* and fixed tests
This commit is contained in:
Martijn van Groningen 2018-05-26 15:05:40 +02:00 committed by GitHub
parent 97a6e91982
commit e477147143
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 502 additions and 117 deletions

View File

@ -2,7 +2,7 @@ ccruser:
cluster: cluster:
- manage_ccr - manage_ccr
indices: indices:
- names: [ 'index1' ] - names: [ 'allowed-index' ]
privileges: privileges:
- monitor - monitor
- read - read

View File

@ -60,37 +60,28 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
public void testFollowIndex() throws Exception { public void testFollowIndex() throws Exception {
final int numDocs = 16; final int numDocs = 16;
final String indexName1 = "index1"; final String allowedIndex = "allowed-index";
final String indexName2 = "index2"; final String unallowedIndex = "unallowed-index";
if (runningAgainstLeaderCluster) { if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster"); logger.info("Running against leader cluster");
Settings indexSettings = Settings.builder() Settings indexSettings = Settings.builder().put("index.soft_deletes.enabled", true).build();
.put("index.soft_deletes.enabled", true) createIndex(allowedIndex, indexSettings);
.build(); createIndex(unallowedIndex, indexSettings);
createIndex(indexName1, indexSettings);
createIndex(indexName2, indexSettings);
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i); logger.info("Indexing doc [{}]", i);
index(indexName1, Integer.toString(i), "field", i); index(allowedIndex, Integer.toString(i), "field", i);
} }
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i); logger.info("Indexing doc [{}]", i);
index(indexName2, Integer.toString(i), "field", i); index(unallowedIndex, Integer.toString(i), "field", i);
} }
refresh(indexName1); refresh(allowedIndex);
verifyDocuments(adminClient(), indexName1, numDocs); verifyDocuments(adminClient(), allowedIndex, numDocs);
} else { } else {
logger.info("Running against follow cluster"); createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
Settings indexSettings = Settings.builder() assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(indexName1, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}");
ensureYellow(indexName1);
followIndex("leader_cluster:" + indexName1, indexName1);
assertBusy(() -> verifyDocuments(client(), indexName1, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1)); assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest("POST", "/" + indexName1 + "/_xpack/ccr/_unfollow")); assertOK(client().performRequest("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow"));
// Make sure that there are no other ccr relates operations running: // Make sure that there are no other ccr relates operations running:
assertBusy(() -> { assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state")); Map<String, Object> clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state"));
@ -99,15 +90,27 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertThat(countCcrNodeTasks(), equalTo(0)); assertThat(countCcrNodeTasks(), equalTo(0));
}); });
// TODO: remove mapping here when ccr syncs mappings too followIndex("leader_cluster:" + allowedIndex, allowedIndex);
createIndex(indexName2, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}"); assertThat(countCcrNodeTasks(), equalTo(1));
ensureYellow(indexName2); assertOK(client().performRequest("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow"));
followIndex("leader_cluster:" + indexName2, indexName2); // Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state"));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});
createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
// Verify that nothing has been replicated and no node tasks are running // Verify that nothing has been replicated and no node tasks are running
// These node tasks should have been failed due to the fact that the user // These node tasks should have been failed due to the fact that the user
// has no sufficient priviledges. // has no sufficient priviledges.
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), indexName2, 0); verifyDocuments(adminClient(), unallowedIndex, 0);
followIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);
} }
} }
@ -148,6 +151,11 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params)); assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params));
} }
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow", params));
}
void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException { void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException {
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
params.put("size", Integer.toString(expectedNumDocs)); params.put("size", Integer.toString(expectedNumDocs));
@ -184,13 +192,4 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
+ ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON))); + ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON)));
} }
private static void ensureYellow(String index) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("wait_for_status", "yellow");
params.put("wait_for_no_relocating_shards", "true");
params.put("timeout", "30s");
params.put("level", "shards");
assertOK(adminClient().performRequest("GET", "_cluster/health/" + index, params));
}
} }

View File

@ -57,23 +57,17 @@ public class FollowIndexIT extends ESRestTestCase {
} else { } else {
logger.info("Running against follow cluster"); logger.info("Running against follow cluster");
final String followIndexName = "test_index2"; final String followIndexName = "test_index2";
Settings indexSettings = Settings.builder() createAndFollowIndex("leader_cluster:" + leaderIndexName, followIndexName);
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}");
ensureYellow(followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs)); assertBusy(() -> verifyDocuments(followIndexName, numDocs));
// unfollow and then follow and then index a few docs in leader index:
unfollowIndex(followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
try (RestClient leaderClient = buildLeaderClient()) { try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs; int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id); index(leaderClient, leaderIndexName, Integer.toString(id), "field", id);
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1); index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1);
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2); index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2);
} }
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
} }
} }
@ -97,6 +91,15 @@ public class FollowIndexIT extends ESRestTestCase {
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params)); assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params));
} }
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow", params));
}
private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_unfollow"));
}
private static void verifyDocuments(String index, int expectedNumDocs) throws IOException { private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
params.put("size", Integer.toString(expectedNumDocs)); params.put("size", Integer.toString(expectedNumDocs));

View File

@ -34,7 +34,8 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
@ -43,7 +44,8 @@ import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction; import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackPlugin;
@ -90,9 +92,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
return Arrays.asList( return Arrays.asList(
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class), new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class), new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class) new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class)
); );
} }
@ -102,7 +105,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
Supplier<DiscoveryNodes> nodesInCluster) { Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList( return Arrays.asList(
new RestUnfollowIndexAction(settings, restController), new RestUnfollowIndexAction(settings, restController),
new RestFollowExistingIndexAction(settings, restController) new RestFollowIndexAction(settings, restController),
new RestCreateAndFollowIndexAction(settings, restController)
); );
} }

View File

@ -0,0 +1,309 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexAction.Request, CreateAndFollowIndexAction.Response,
CreateAndFollowIndexAction.RequestBuilder> {
public static final CreateAndFollowIndexAction INSTANCE = new CreateAndFollowIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/create_and_follow_index";
private CreateAndFollowIndexAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> {
private FollowIndexAction.Request followRequest;
public FollowIndexAction.Request getFollowRequest() {
return followRequest;
}
public void setFollowRequest(FollowIndexAction.Request followRequest) {
this.followRequest = followRequest;
}
@Override
public ActionRequestValidationException validate() {
return followRequest.validate();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
followRequest = new FollowIndexAction.Request();
followRequest.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
followRequest.writeTo(out);
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private boolean followIndexCreated;
private boolean followIndexShardsAcked;
private boolean indexFollowingStarted;
Response() {
}
Response(boolean followIndexCreated, boolean followIndexShardsAcked, boolean indexFollowingStarted) {
this.followIndexCreated = followIndexCreated;
this.followIndexShardsAcked = followIndexShardsAcked;
this.indexFollowingStarted = indexFollowingStarted;
}
public boolean isFollowIndexCreated() {
return followIndexCreated;
}
public boolean isFollowIndexShardsAcked() {
return followIndexShardsAcked;
}
public boolean isIndexFollowingStarted() {
return indexFollowingStarted;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
followIndexCreated = in.readBoolean();
followIndexShardsAcked = in.readBoolean();
indexFollowingStarted = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(followIndexCreated);
out.writeBoolean(followIndexShardsAcked);
out.writeBoolean(indexFollowingStarted);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("follow_index_created", followIndexCreated);
builder.field("follow_index_shards_acked", followIndexShardsAcked);
builder.field("index_following_started", indexFollowingStarted);
}
builder.endObject();
return builder;
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, CreateAndFollowIndexAction.RequestBuilder> {
RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final Client client;
private final AllocationService allocationService;
private final RemoteClusterService remoteClusterService;
private final ActiveShardsObserver activeShardsObserver;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client,
AllocationService allocationService) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.allocationService = allocationService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
String[] indices = new String[]{request.getFollowRequest().getLeaderIndex()};
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex());
createFollowIndex(leaderIndexMetadata, request, listener);
} else {
// Following an index in remote cluster, so use remote client to fetch leader IndexMetaData:
assert remoteClusterIndices.size() == 1;
Map.Entry<String, List<String>> entry = remoteClusterIndices.entrySet().iterator().next();
assert entry.getValue().size() == 1;
String clusterNameAlias = entry.getKey();
String leaderIndex = entry.getValue().get(0);
Client remoteClient = client.getRemoteClusterClient(clusterNameAlias);
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> {
ClusterState remoteClusterState = r.getState();
IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex);
createFollowIndex(leaderIndexMetadata, request, listener);
}, listener::onFailure));
}
}
private void createFollowIndex(IndexMetaData leaderIndexMetaData, Request request, ActionListener<Response> listener) {
if (leaderIndexMetaData == null) {
listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() +
"] does not exist"));
return;
}
ActionListener<Boolean> handler = ActionListener.wrap(
result -> {
if (result) {
initiateFollowing(request, listener);
} else {
listener.onResponse(new Response(true, false, false));
}
},
listener::onFailure);
// Can't use create index api here, because then index templates can alter the mappings / settings.
// And index templates could introduce settings / mappings that are incompatible with the leader index.
clusterService.submitStateUpdateTask("follow_index_action", new AckedClusterStateUpdateTask<Boolean>(request, handler) {
@Override
protected Boolean newResponse(boolean acknowledged) {
return acknowledged;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData currentIndex = currentState.metaData().index(request.getFollowRequest().getFollowIndex());
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
}
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(request.getFollowRequest().getFollowIndex());
// Copy all settings, but overwrite a few settings.
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(leaderIndexMetaData.getSettings());
// Overwriting UUID here, because otherwise we can't follow indices in the same cluster
settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowIndex());
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
imdBuilder.settings(settingsBuilder);
// Copy mappings from leader IMD to follow IMD
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
imdBuilder.putMapping(cursor.value);
}
mdBuilder.put(imdBuilder.build(), false);
ClusterState.Builder builder = ClusterState.builder(currentState);
builder.metaData(mdBuilder.build());
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowIndex()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowIndex() + "] created");
return updatedState;
}
});
}
private void initiateFollowing(Request request, ActionListener<Response> listener) {
activeShardsObserver.waitForActiveShards(new String[]{request.followRequest.getFollowIndex()},
ActiveShardCount.DEFAULT, request.timeout(), result -> {
if (result) {
client.execute(FollowIndexAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
r -> listener.onResponse(new Response(true, true, r.isAcknowledged())),
listener::onFailure
));
} else {
listener.onResponse(new Response(true, false, false));
}
}, listener::onFailure);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowIndex());
}
}
}

View File

@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.Request, public class FollowIndexAction extends Action<FollowIndexAction.Request,
FollowExistingIndexAction.Response, FollowExistingIndexAction.RequestBuilder> { FollowIndexAction.Response, FollowIndexAction.RequestBuilder> {
public static final FollowExistingIndexAction INSTANCE = new FollowExistingIndexAction(); public static final FollowIndexAction INSTANCE = new FollowIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/follow_existing_index"; public static final String NAME = "cluster:admin/xpack/ccr/follow_index";
private FollowExistingIndexAction() { private FollowIndexAction() {
super(NAME); super(NAME);
} }
@ -134,7 +134,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
} }
} }
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, FollowExistingIndexAction.RequestBuilder> { public static class RequestBuilder extends ActionRequestBuilder<Request, Response, FollowIndexAction.RequestBuilder> {
RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) { RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
super(client, action, new Request()); super(client, action, new Request());

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction.Request;
public class RestCreateAndFollowIndexAction extends BaseRestHandler {
public RestCreateAndFollowIndexAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST, "/{index}/_xpack/ccr/_create_and_follow", this);
}
@Override
public String getName() {
return "xpack_ccr_create_and_follow_index_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request();
request.setFollowRequest(RestFollowIndexAction.createRequest(restRequest));
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -15,12 +15,12 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE; import static org.elasticsearch.xpack.ccr.action.FollowIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request; import static org.elasticsearch.xpack.ccr.action.FollowIndexAction.Request;
public class RestFollowExistingIndexAction extends BaseRestHandler { public class RestFollowIndexAction extends BaseRestHandler {
public RestFollowExistingIndexAction(Settings settings, RestController controller) { public RestFollowIndexAction(Settings settings, RestController controller) {
super(settings); super(settings);
controller.registerHandler(RestRequest.Method.POST, "/{index}/_xpack/ccr/_follow", this); controller.registerHandler(RestRequest.Method.POST, "/{index}/_xpack/ccr/_follow", this);
} }
@ -32,6 +32,11 @@ public class RestFollowExistingIndexAction extends BaseRestHandler {
@Override @Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = createRequest(restRequest);
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
}
static Request createRequest(RestRequest restRequest) {
Request request = new Request(); Request request = new Request();
request.setLeaderIndex(restRequest.param("leader_index")); request.setLeaderIndex(restRequest.param("leader_index"));
request.setFollowIndex(restRequest.param("index")); request.setFollowIndex(restRequest.param("index"));
@ -45,6 +50,6 @@ public class RestFollowExistingIndexAction extends BaseRestHandler {
long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName()));
request.setProcessorMaxTranslogBytes(value); request.setProcessorMaxTranslogBytes(value);
} }
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); return request;
} }
} }

View File

@ -19,7 +19,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -28,7 +27,8 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
@ -143,21 +143,18 @@ public class ShardChangesIT extends ESIntegTestCase {
public void testFollowIndex() throws Exception { public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3); final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index1");
final String followerIndexSettings = final FollowIndexAction.Request followRequest = new FollowIndexAction.Request();
getIndexSettings(numberOfPrimaryShards, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
ensureGreen("index1", "index2");
final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request();
followRequest.setLeaderIndex("index1"); followRequest.setLeaderIndex("index1");
followRequest.setFollowIndex("index2"); followRequest.setFollowIndex("index2");
client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get();
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request();
createAndFollowRequest.setFollowRequest(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
final int firstBatchNumDocs = randomIntBetween(2, 64); final int firstBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < firstBatchNumDocs; i++) { for (int i = 0; i < firstBatchNumDocs; i++) {
@ -180,6 +177,8 @@ public class ShardChangesIT extends ESIntegTestCase {
assertBusy(assertExpectedDocumentRunnable(i)); assertBusy(assertExpectedDocumentRunnable(i));
} }
unfollowIndex("index2");
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
final int secondBatchNumDocs = randomIntBetween(2, 64); final int secondBatchNumDocs = randomIntBetween(2, 64);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
@ -200,27 +199,7 @@ public class ShardChangesIT extends ESIntegTestCase {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i)); assertBusy(assertExpectedDocumentRunnable(i));
} }
unfollowIndex("index2");
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
unfollowRequest.setFollowIndex("index2");
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get();
int numNodeTasks = 0;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
numNodeTasks++;
}
}
assertThat(numNodeTasks, equalTo(0));
});
} }
public void testFollowIndexWithNestedField() throws Exception { public void testFollowIndexWithNestedField() throws Exception {
@ -234,10 +213,10 @@ public class ShardChangesIT extends ESIntegTestCase {
ensureGreen("index1", "index2"); ensureGreen("index1", "index2");
final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request();
followRequest.setLeaderIndex("index1"); followRequest.setLeaderIndex("index1");
followRequest.setFollowIndex("index2"); followRequest.setFollowIndex("index2");
client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get(); client().execute(FollowIndexAction.INSTANCE, followRequest).get();
final int numDocs = randomIntBetween(2, 64); final int numDocs = randomIntBetween(2, 64);
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
@ -287,19 +266,19 @@ public class ShardChangesIT extends ESIntegTestCase {
public void testFollowNonExistentIndex() throws Exception { public void testFollowNonExistentIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test-leader").get()); assertAcked(client().admin().indices().prepareCreate("test-leader").get());
assertAcked(client().admin().indices().prepareCreate("test-follower").get()); assertAcked(client().admin().indices().prepareCreate("test-follower").get());
final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request();
// Leader index does not exist. // Leader index does not exist.
followRequest.setLeaderIndex("non-existent-leader"); followRequest.setLeaderIndex("non-existent-leader");
followRequest.setFollowIndex("test-follower"); followRequest.setFollowIndex("test-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet());
// Follower index does not exist. // Follower index does not exist.
followRequest.setLeaderIndex("test-leader"); followRequest.setLeaderIndex("test-leader");
followRequest.setFollowIndex("non-existent-follower"); followRequest.setFollowIndex("non-existent-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet());
// Both indices do not exist. // Both indices do not exist.
followRequest.setLeaderIndex("non-existent-leader"); followRequest.setLeaderIndex("non-existent-leader");
followRequest.setFollowIndex("non-existent-follower"); followRequest.setFollowIndex("non-existent-follower");
expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet());
} }
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) { private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
@ -338,6 +317,28 @@ public class ShardChangesIT extends ESIntegTestCase {
}; };
} }
private void unfollowIndex(String index) throws Exception {
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
unfollowRequest.setFollowIndex(index);
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get();
int numNodeTasks = 0;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
numNodeTasks++;
}
}
assertThat(numNodeTasks, equalTo(0));
});
}
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) { private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
return () -> { return () -> {
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get(); final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();

View File

@ -14,41 +14,41 @@ import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class FollowExistingIndexActionTests extends ESTestCase { public class FollowIndexActionTests extends ESTestCase {
public void testValidation() { public void testValidation() {
FollowExistingIndexAction.Request request = new FollowExistingIndexAction.Request(); FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex("index1"); request.setLeaderIndex("index1");
request.setFollowIndex("index2"); request.setFollowIndex("index2");
{ {
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(null, null, request)); Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(null, null, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist")); assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
} }
{ {
IndexMetaData leaderIMD = createIMD("index1", 5); IndexMetaData leaderIMD = createIMD("index1", 5);
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(leaderIMD, null, request)); Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(leaderIMD, null, request));
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
} }
{ {
IndexMetaData leaderIMD = createIMD("index1", 5); IndexMetaData leaderIMD = createIMD("index1", 5);
IndexMetaData followIMD = createIMD("index2", 5); IndexMetaData followIMD = createIMD("index2", 5);
Exception e = expectThrows(IllegalArgumentException.class, Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request)); () -> FollowIndexAction.validate(leaderIMD, followIMD, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
} }
{ {
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
IndexMetaData followIMD = createIMD("index2", 4); IndexMetaData followIMD = createIMD("index2", 4);
Exception e = expectThrows(IllegalArgumentException.class, Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request)); () -> FollowIndexAction.validate(leaderIMD, followIMD, request));
assertThat(e.getMessage(), assertThat(e.getMessage(),
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]")); equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
} }
{ {
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
IndexMetaData followIMD = createIMD("index2", 5); IndexMetaData followIMD = createIMD("index2", 5);
FollowExistingIndexAction.validate(leaderIMD, followIMD, request); FollowIndexAction.validate(leaderIMD, followIMD, request);
} }
} }

View File

@ -0,0 +1,24 @@
{
"xpack.ccr.create_and_follow_index": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
"methods": [ "POST" ],
"url": {
"path": "/{index}/_xpack/ccr/_create_and_follow",
"paths": [ "/{index}/_xpack/ccr/_create_and_follow" ],
"parts": {
"index": {
"type": "string",
"required": true,
"description": "The name of the index that follows the leader index."
}
},
"params": {
"leader_index": {
"type": "string",
"required": true,
"description": "The name of the index to read the changes from."
}
}
}
}
}

View File

@ -1,5 +1,5 @@
{ {
"xpack.ccr.follow_existing_index": { "xpack.ccr.follow_index": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
"methods": [ "POST" ], "methods": [ "POST" ],
"url": { "url": {

View File

@ -16,18 +16,20 @@
- is_true: acknowledged - is_true: acknowledged
- do: - do:
indices.create: xpack.ccr.create_and_follow_index:
leader_index: foo
index: bar
- is_true: follow_index_created
- is_true: follow_index_shards_acked
- is_true: index_following_started
- do:
xpack.ccr.unfollow_index:
index: bar index: bar
body:
mappings:
doc:
properties:
field:
type: keyword
- is_true: acknowledged - is_true: acknowledged
- do: - do:
xpack.ccr.follow_existing_index: xpack.ccr.follow_index:
leader_index: foo leader_index: foo
index: bar index: bar
- is_true: acknowledged - is_true: acknowledged