Rename CCR APIs (#34027)
* Renamed CCR APIs Renamed: * `/{index}/_ccr/create_and_follow` to `/{index}/_ccr/follow` * `/{index}/_ccr/unfollow` to `/{index}/_ccr/pause_follow` * `/{index}/_ccr/follow` to `/{index}/_ccr/resume_follow` Relates to #33931
This commit is contained in:
parent
17b3b97899
commit
9129948f60
|
@ -31,17 +31,17 @@ public class CcrMultiClusterLicenseIT extends ESRestTestCase {
|
|||
return true;
|
||||
}
|
||||
|
||||
public void testFollowIndex() {
|
||||
public void testResumeFollow() {
|
||||
if (runningAgainstLeaderCluster == false) {
|
||||
final Request request = new Request("POST", "/follower/_ccr/follow");
|
||||
final Request request = new Request("POST", "/follower/_ccr/resume_follow");
|
||||
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
|
||||
assertNonCompliantLicense(request);
|
||||
}
|
||||
}
|
||||
|
||||
public void testCreateAndFollowIndex() {
|
||||
public void testFollow() {
|
||||
if (runningAgainstLeaderCluster == false) {
|
||||
final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
|
||||
final Request request = new Request("PUT", "/follower/_ccr/follow");
|
||||
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
|
||||
assertNonCompliantLicense(request);
|
||||
}
|
||||
|
|
|
@ -80,11 +80,11 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
|
|||
refresh(allowedIndex);
|
||||
verifyDocuments(adminClient(), allowedIndex, numDocs);
|
||||
} else {
|
||||
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
|
||||
follow("leader_cluster:" + allowedIndex, allowedIndex);
|
||||
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
|
||||
assertThat(countCcrNodeTasks(), equalTo(1));
|
||||
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
|
||||
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
|
||||
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
|
||||
// Make sure that there are no other ccr relates operations running:
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
|
||||
|
@ -93,9 +93,9 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
|
|||
assertThat(countCcrNodeTasks(), equalTo(0));
|
||||
});
|
||||
|
||||
followIndex("leader_cluster:" + allowedIndex, allowedIndex);
|
||||
resumeFollow("leader_cluster:" + allowedIndex, allowedIndex);
|
||||
assertThat(countCcrNodeTasks(), equalTo(1));
|
||||
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
|
||||
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
|
||||
// Make sure that there are no other ccr relates operations running:
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
|
||||
|
@ -105,15 +105,15 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
|
|||
});
|
||||
|
||||
Exception e = expectThrows(ResponseException.class,
|
||||
() -> createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
|
||||
() -> follow("leader_cluster:" + unallowedIndex, unallowedIndex));
|
||||
assertThat(e.getMessage(),
|
||||
containsString("action [indices:admin/xpack/ccr/create_and_follow_index] is unauthorized for user [test_ccr]"));
|
||||
containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
|
||||
// Verify that the follow index has not been created and no node tasks are running
|
||||
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
|
||||
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
|
||||
|
||||
e = expectThrows(ResponseException.class,
|
||||
() -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
|
||||
() -> resumeFollow("leader_cluster:" + unallowedIndex, unallowedIndex));
|
||||
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
|
||||
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
|
||||
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
|
||||
|
@ -157,10 +157,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
|
|||
verifyAutoFollowMonitoring();
|
||||
});
|
||||
|
||||
// Cleanup by deleting auto follow pattern and unfollowing:
|
||||
// Cleanup by deleting auto follow pattern and pause following:
|
||||
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
|
||||
assertOK(client().performRequest(request));
|
||||
unfollowIndex(allowedIndex);
|
||||
pauseFollow(allowedIndex);
|
||||
}
|
||||
|
||||
private int countCcrNodeTasks() throws IOException {
|
||||
|
@ -201,14 +201,14 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
|
|||
assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh")));
|
||||
}
|
||||
|
||||
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
|
||||
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
|
||||
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
|
||||
private static void follow(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
|
||||
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
@ -273,8 +273,8 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
|
|||
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
|
||||
}
|
||||
|
||||
private static void unfollowIndex(String followIndex) throws IOException {
|
||||
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
|
||||
private static void pauseFollow(String followIndex) throws IOException {
|
||||
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
|
||||
}
|
||||
|
||||
private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
|
||||
|
|
|
@ -67,11 +67,11 @@ public class FollowIndexIT extends ESRestTestCase {
|
|||
} else {
|
||||
logger.info("Running against follow cluster");
|
||||
final String followIndexName = "test_index2";
|
||||
createAndFollowIndex("leader_cluster:" + leaderIndexName, followIndexName);
|
||||
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
|
||||
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
|
||||
// unfollow and then follow and then index a few docs in leader index:
|
||||
unfollowIndex(followIndexName);
|
||||
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
|
||||
pauseFollow(followIndexName);
|
||||
resumeFollow("leader_cluster:" + leaderIndexName, followIndexName);
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
int id = numDocs;
|
||||
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
|
||||
|
@ -86,11 +86,11 @@ public class FollowIndexIT extends ESRestTestCase {
|
|||
public void testFollowNonExistingLeaderIndex() throws Exception {
|
||||
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
|
||||
ResponseException e = expectThrows(ResponseException.class,
|
||||
() -> followIndex("leader_cluster:non-existing-index", "non-existing-index"));
|
||||
() -> resumeFollow("leader_cluster:non-existing-index", "non-existing-index"));
|
||||
assertThat(e.getMessage(), containsString("no such index"));
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
|
||||
|
||||
e = expectThrows(ResponseException.class, () -> createAndFollowIndex("leader_cluster:non-existing-index", "non-existing-index"));
|
||||
e = expectThrows(ResponseException.class, () -> followIndex("leader_cluster:non-existing-index", "non-existing-index"));
|
||||
assertThat(e.getMessage(), containsString("no such index"));
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
|
||||
}
|
||||
|
@ -146,20 +146,20 @@ public class FollowIndexIT extends ESRestTestCase {
|
|||
assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh")));
|
||||
}
|
||||
|
||||
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
|
||||
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
|
||||
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
|
||||
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
|
||||
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void unfollowIndex(String followIndex) throws IOException {
|
||||
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
|
||||
private static void pauseFollow(String followIndex) throws IOException {
|
||||
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
|
||||
}
|
||||
|
||||
private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.create_and_follow_index:
|
||||
ccr.follow:
|
||||
index: bar
|
||||
body:
|
||||
leader_index: foo
|
||||
|
@ -25,18 +25,18 @@
|
|||
- is_true: index_following_started
|
||||
|
||||
- do:
|
||||
ccr.unfollow_index:
|
||||
ccr.pause_follow:
|
||||
index: bar
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.follow_index:
|
||||
ccr.resume_follow:
|
||||
index: bar
|
||||
body:
|
||||
leader_index: foo
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.unfollow_index:
|
||||
ccr.pause_follow:
|
||||
index: bar
|
||||
- is_true: acknowledged
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
type: keyword
|
||||
|
||||
- do:
|
||||
ccr.create_and_follow_index:
|
||||
ccr.follow:
|
||||
index: bar
|
||||
body:
|
||||
leader_index: foo
|
||||
|
@ -51,7 +51,7 @@
|
|||
- gte: { bar.0.time_since_last_fetch_millis: -1 }
|
||||
|
||||
- do:
|
||||
ccr.unfollow_index:
|
||||
ccr.pause_follow:
|
||||
index: bar
|
||||
- is_true: acknowledged
|
||||
|
||||
|
|
|
@ -52,26 +52,26 @@ import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
|
|||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -161,9 +161,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
|
||||
new ActionHandler<>(AutoFollowStatsAction.INSTANCE, TransportAutoFollowStatsAction.class),
|
||||
// follow actions
|
||||
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, TransportCreateAndFollowIndexAction.class),
|
||||
new ActionHandler<>(FollowIndexAction.INSTANCE, TransportFollowIndexAction.class),
|
||||
new ActionHandler<>(UnfollowIndexAction.INSTANCE, TransportUnfollowIndexAction.class),
|
||||
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
|
||||
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
|
||||
new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),
|
||||
// auto-follow actions
|
||||
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
|
||||
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
|
||||
|
@ -183,9 +183,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
new RestCcrStatsAction(settings, restController),
|
||||
new RestAutoFollowStatsAction(settings, restController),
|
||||
// follow APIs
|
||||
new RestCreateAndFollowIndexAction(settings, restController),
|
||||
new RestFollowIndexAction(settings, restController),
|
||||
new RestUnfollowIndexAction(settings, restController),
|
||||
new RestPutFollowAction(settings, restController),
|
||||
new RestResumeFollowAction(settings, restController),
|
||||
new RestPauseFollowAction(settings, restController),
|
||||
// auto-follow APIs
|
||||
new RestDeleteAutoFollowPatternAction(settings, restController),
|
||||
new RestPutAutoFollowPatternAction(settings, restController),
|
||||
|
|
|
@ -34,8 +34,8 @@ import org.elasticsearch.xpack.ccr.CcrSettings;
|
|||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -184,13 +184,13 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers,
|
||||
FollowIndexAction.Request followRequest,
|
||||
ResumeFollowAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
|
||||
CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
PutFollowAction.Request request = new PutFollowAction.Request(followRequest);
|
||||
followerClient.execute(
|
||||
CreateAndFollowIndexAction.INSTANCE,
|
||||
PutFollowAction.INSTANCE,
|
||||
request,
|
||||
ActionListener.wrap(r -> successHandler.run(), failureHandler)
|
||||
);
|
||||
|
@ -306,7 +306,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|||
|
||||
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
|
||||
clusterAlias + ":" + leaderIndexName;
|
||||
FollowIndexAction.Request request = new FollowIndexAction.Request();
|
||||
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
|
||||
request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix);
|
||||
request.setFollowerIndex(followIndexName);
|
||||
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
|
||||
|
@ -409,7 +409,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|||
|
||||
abstract void createAndFollow(
|
||||
Map<String, String> headers,
|
||||
FollowIndexAction.Request followRequest,
|
||||
ResumeFollowAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler
|
||||
);
|
||||
|
|
|
@ -63,8 +63,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
private int maxOperationCount;
|
||||
private ShardId shardId;
|
||||
private String expectedHistoryUUID;
|
||||
private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT;
|
||||
private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
|
||||
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
|
||||
private long maxOperationSizeInBytes = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
|
||||
|
||||
public Request(ShardId shardId, String expectedHistoryUUID) {
|
||||
super(shardId.getIndexName());
|
||||
|
|
|
@ -19,24 +19,24 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
public class TransportUnfollowIndexAction extends HandledTransportAction<UnfollowIndexAction.Request, AcknowledgedResponse> {
|
||||
public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {
|
||||
|
||||
private final Client client;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
|
||||
@Inject
|
||||
public TransportUnfollowIndexAction(
|
||||
public TransportPauseFollowAction(
|
||||
final Settings settings,
|
||||
final TransportService transportService,
|
||||
final ActionFilters actionFilters,
|
||||
final Client client,
|
||||
final PersistentTasksService persistentTasksService) {
|
||||
super(settings, UnfollowIndexAction.NAME, transportService, actionFilters, UnfollowIndexAction.Request::new);
|
||||
super(settings, PauseFollowAction.NAME, transportService, actionFilters, PauseFollowAction.Request::new);
|
||||
this.client = client;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ public class TransportUnfollowIndexAction extends HandledTransportAction<Unfollo
|
|||
@Override
|
||||
protected void doExecute(
|
||||
final Task task,
|
||||
final UnfollowIndexAction.Request request,
|
||||
final PauseFollowAction.Request request,
|
||||
final ActionListener<AcknowledgedResponse> listener) {
|
||||
|
||||
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
|
|
@ -37,8 +37,8 @@ import org.elasticsearch.transport.TransportService;
|
|||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -46,8 +46,8 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public final class TransportCreateAndFollowIndexAction
|
||||
extends TransportMasterNodeAction<CreateAndFollowIndexAction.Request, CreateAndFollowIndexAction.Response> {
|
||||
public final class TransportPutFollowAction
|
||||
extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> {
|
||||
|
||||
private final Client client;
|
||||
private final AllocationService allocationService;
|
||||
|
@ -56,7 +56,7 @@ public final class TransportCreateAndFollowIndexAction
|
|||
private final CcrLicenseChecker ccrLicenseChecker;
|
||||
|
||||
@Inject
|
||||
public TransportCreateAndFollowIndexAction(
|
||||
public TransportPutFollowAction(
|
||||
final Settings settings,
|
||||
final ThreadPool threadPool,
|
||||
final TransportService transportService,
|
||||
|
@ -68,13 +68,13 @@ public final class TransportCreateAndFollowIndexAction
|
|||
final CcrLicenseChecker ccrLicenseChecker) {
|
||||
super(
|
||||
settings,
|
||||
CreateAndFollowIndexAction.NAME,
|
||||
PutFollowAction.NAME,
|
||||
transportService,
|
||||
clusterService,
|
||||
threadPool,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
CreateAndFollowIndexAction.Request::new);
|
||||
PutFollowAction.Request::new);
|
||||
this.client = client;
|
||||
this.allocationService = allocationService;
|
||||
this.remoteClusterService = transportService.getRemoteClusterService();
|
||||
|
@ -88,15 +88,15 @@ public final class TransportCreateAndFollowIndexAction
|
|||
}
|
||||
|
||||
@Override
|
||||
protected CreateAndFollowIndexAction.Response newResponse() {
|
||||
return new CreateAndFollowIndexAction.Response();
|
||||
protected PutFollowAction.Response newResponse() {
|
||||
return new PutFollowAction.Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(
|
||||
final CreateAndFollowIndexAction.Request request,
|
||||
final PutFollowAction.Request request,
|
||||
final ClusterState state,
|
||||
final ActionListener<CreateAndFollowIndexAction.Response> listener) throws Exception {
|
||||
final ActionListener<PutFollowAction.Response> listener) throws Exception {
|
||||
if (ccrLicenseChecker.isCcrAllowed() == false) {
|
||||
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
||||
return;
|
||||
|
@ -116,9 +116,9 @@ public final class TransportCreateAndFollowIndexAction
|
|||
}
|
||||
|
||||
private void createFollowerIndexAndFollowLocalIndex(
|
||||
final CreateAndFollowIndexAction.Request request,
|
||||
final PutFollowAction.Request request,
|
||||
final ClusterState state,
|
||||
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
|
||||
final ActionListener<PutFollowAction.Response> listener) {
|
||||
// following an index in local cluster, so use local cluster state to fetch leader index metadata
|
||||
final String leaderIndex = request.getFollowRequest().getLeaderIndex();
|
||||
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex);
|
||||
|
@ -134,10 +134,10 @@ public final class TransportCreateAndFollowIndexAction
|
|||
}
|
||||
|
||||
private void createFollowerIndexAndFollowRemoteIndex(
|
||||
final CreateAndFollowIndexAction.Request request,
|
||||
final PutFollowAction.Request request,
|
||||
final String clusterAlias,
|
||||
final String leaderIndex,
|
||||
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
|
||||
final ActionListener<PutFollowAction.Response> listener) {
|
||||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
|
||||
client,
|
||||
clusterAlias,
|
||||
|
@ -149,8 +149,8 @@ public final class TransportCreateAndFollowIndexAction
|
|||
private void createFollowerIndex(
|
||||
final IndexMetaData leaderIndexMetaData,
|
||||
final String[] historyUUIDs,
|
||||
final CreateAndFollowIndexAction.Request request,
|
||||
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
|
||||
final PutFollowAction.Request request,
|
||||
final ActionListener<PutFollowAction.Response> listener) {
|
||||
if (leaderIndexMetaData == null) {
|
||||
listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() +
|
||||
"] does not exist"));
|
||||
|
@ -162,7 +162,7 @@ public final class TransportCreateAndFollowIndexAction
|
|||
if (result) {
|
||||
initiateFollowing(request, listener);
|
||||
} else {
|
||||
listener.onResponse(new CreateAndFollowIndexAction.Response(true, false, false));
|
||||
listener.onResponse(new PutFollowAction.Response(true, false, false));
|
||||
}
|
||||
},
|
||||
listener::onFailure);
|
||||
|
@ -228,23 +228,23 @@ public final class TransportCreateAndFollowIndexAction
|
|||
}
|
||||
|
||||
private void initiateFollowing(
|
||||
final CreateAndFollowIndexAction.Request request,
|
||||
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
|
||||
final PutFollowAction.Request request,
|
||||
final ActionListener<PutFollowAction.Response> listener) {
|
||||
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
|
||||
ActiveShardCount.DEFAULT, request.timeout(), result -> {
|
||||
if (result) {
|
||||
client.execute(FollowIndexAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
|
||||
r -> listener.onResponse(new CreateAndFollowIndexAction.Response(true, true, r.isAcknowledged())),
|
||||
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
|
||||
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
|
||||
listener::onFailure
|
||||
));
|
||||
} else {
|
||||
listener.onResponse(new CreateAndFollowIndexAction.Response(true, false, false));
|
||||
listener.onResponse(new PutFollowAction.Response(true, false, false));
|
||||
}
|
||||
}, listener::onFailure);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(final CreateAndFollowIndexAction.Request request, final ClusterState state) {
|
||||
protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) {
|
||||
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
|
||||
}
|
||||
|
|
@ -40,7 +40,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class TransportFollowIndexAction extends HandledTransportAction<FollowIndexAction.Request, AcknowledgedResponse> {
|
||||
public class TransportResumeFollowAction extends HandledTransportAction<ResumeFollowAction.Request, AcknowledgedResponse> {
|
||||
|
||||
static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
|
||||
private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
|
||||
|
@ -73,7 +73,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
private final CcrLicenseChecker ccrLicenseChecker;
|
||||
|
||||
@Inject
|
||||
public TransportFollowIndexAction(
|
||||
public TransportResumeFollowAction(
|
||||
final Settings settings,
|
||||
final ThreadPool threadPool,
|
||||
final TransportService transportService,
|
||||
|
@ -83,7 +83,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
final PersistentTasksService persistentTasksService,
|
||||
final IndicesService indicesService,
|
||||
final CcrLicenseChecker ccrLicenseChecker) {
|
||||
super(settings, FollowIndexAction.NAME, transportService, actionFilters, FollowIndexAction.Request::new);
|
||||
super(settings, ResumeFollowAction.NAME, transportService, actionFilters, ResumeFollowAction.Request::new);
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
|
@ -95,7 +95,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
|
||||
@Override
|
||||
protected void doExecute(final Task task,
|
||||
final FollowIndexAction.Request request,
|
||||
final ResumeFollowAction.Request request,
|
||||
final ActionListener<AcknowledgedResponse> listener) {
|
||||
if (ccrLicenseChecker.isCcrAllowed() == false) {
|
||||
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
||||
|
@ -115,7 +115,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
}
|
||||
}
|
||||
|
||||
private void followLocalIndex(final FollowIndexAction.Request request,
|
||||
private void followLocalIndex(final ResumeFollowAction.Request request,
|
||||
final ActionListener<AcknowledgedResponse> listener) {
|
||||
final ClusterState state = clusterService.state();
|
||||
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
|
||||
|
@ -134,7 +134,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
}
|
||||
|
||||
private void followRemoteIndex(
|
||||
final FollowIndexAction.Request request,
|
||||
final ResumeFollowAction.Request request,
|
||||
final String clusterAlias,
|
||||
final String leaderIndex,
|
||||
final ActionListener<AcknowledgedResponse> listener) {
|
||||
|
@ -165,7 +165,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
* </ul>
|
||||
*/
|
||||
void start(
|
||||
FollowIndexAction.Request request,
|
||||
ResumeFollowAction.Request request,
|
||||
String clusterNameAlias,
|
||||
IndexMetaData leaderIndexMetadata,
|
||||
IndexMetaData followIndexMetadata,
|
||||
|
@ -233,7 +233,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
}
|
||||
|
||||
static void validate(
|
||||
final FollowIndexAction.Request request,
|
||||
final ResumeFollowAction.Request request,
|
||||
final IndexMetaData leaderIndex,
|
||||
final IndexMetaData followIndex,
|
||||
final String[] leaderIndexHistoryUUID,
|
||||
|
@ -300,7 +300,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
|
|||
private static ShardFollowTask createShardFollowTask(
|
||||
int shardId,
|
||||
String clusterAliasName,
|
||||
FollowIndexAction.Request request,
|
||||
ResumeFollowAction.Request request,
|
||||
IndexMetaData leaderIndexMetadata,
|
||||
IndexMetaData followIndexMetadata,
|
||||
String recordedLeaderShardHistoryUUID,
|
|
@ -14,19 +14,19 @@ import org.elasticsearch.rest.action.RestToXContentListener;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction.INSTANCE;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction.Request;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.PauseFollowAction.INSTANCE;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.PauseFollowAction.Request;
|
||||
|
||||
public class RestUnfollowIndexAction extends BaseRestHandler {
|
||||
public class RestPauseFollowAction extends BaseRestHandler {
|
||||
|
||||
public RestUnfollowIndexAction(Settings settings, RestController controller) {
|
||||
public RestPauseFollowAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/unfollow", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/pause_follow", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "ccr_unfollow_index_action";
|
||||
return "ccr_pause_follow_action";
|
||||
}
|
||||
|
||||
@Override
|
|
@ -14,24 +14,24 @@ import org.elasticsearch.rest.action.RestToXContentListener;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction.INSTANCE;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction.Request;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.PutFollowAction.INSTANCE;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.PutFollowAction.Request;
|
||||
|
||||
public class RestCreateAndFollowIndexAction extends BaseRestHandler {
|
||||
public class RestPutFollowAction extends BaseRestHandler {
|
||||
|
||||
public RestCreateAndFollowIndexAction(Settings settings, RestController controller) {
|
||||
public RestPutFollowAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/create_and_follow", this);
|
||||
controller.registerHandler(RestRequest.Method.PUT, "/{index}/_ccr/follow", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "ccr_create_and_follow_index_action";
|
||||
return "ccr_put_follow_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
Request request = new Request(RestFollowIndexAction.createRequest(restRequest));
|
||||
Request request = new Request(RestResumeFollowAction.createRequest(restRequest));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -15,19 +15,19 @@ import org.elasticsearch.rest.action.RestToXContentListener;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ccr.action.FollowIndexAction.INSTANCE;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.FollowIndexAction.Request;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.INSTANCE;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request;
|
||||
|
||||
public class RestFollowIndexAction extends BaseRestHandler {
|
||||
public class RestResumeFollowAction extends BaseRestHandler {
|
||||
|
||||
public RestFollowIndexAction(Settings settings, RestController controller) {
|
||||
public RestResumeFollowAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/follow", this);
|
||||
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/resume_follow", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "ccr_follow_index_action";
|
||||
return "ccr_resume_follow_action";
|
||||
}
|
||||
|
||||
@Override
|
|
@ -23,8 +23,8 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
|
|||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
|
@ -44,10 +44,10 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
|
||||
final FollowIndexAction.Request followRequest = getFollowRequest();
|
||||
final ResumeFollowAction.Request followRequest = getFollowRequest();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client().execute(
|
||||
FollowIndexAction.INSTANCE,
|
||||
ResumeFollowAction.INSTANCE,
|
||||
followRequest,
|
||||
new ActionListener<AcknowledgedResponse>() {
|
||||
@Override
|
||||
|
@ -66,15 +66,15 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
|
||||
final FollowIndexAction.Request followRequest = getFollowRequest();
|
||||
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
final ResumeFollowAction.Request followRequest = getFollowRequest();
|
||||
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client().execute(
|
||||
CreateAndFollowIndexAction.INSTANCE,
|
||||
PutFollowAction.INSTANCE,
|
||||
createAndFollowRequest,
|
||||
new ActionListener<CreateAndFollowIndexAction.Response>() {
|
||||
new ActionListener<PutFollowAction.Response>() {
|
||||
@Override
|
||||
public void onResponse(final CreateAndFollowIndexAction.Response response) {
|
||||
public void onResponse(final PutFollowAction.Response response) {
|
||||
latch.countDown();
|
||||
fail();
|
||||
}
|
||||
|
@ -191,8 +191,8 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
|
|||
assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]"));
|
||||
}
|
||||
|
||||
private FollowIndexAction.Request getFollowRequest() {
|
||||
FollowIndexAction.Request request = new FollowIndexAction.Request();
|
||||
private ResumeFollowAction.Request getFollowRequest() {
|
||||
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
|
||||
request.setLeaderIndex("leader");
|
||||
request.setFollowerIndex("follower");
|
||||
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
|
||||
|
|
|
@ -51,9 +51,9 @@ import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
|
|||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -177,9 +177,9 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureYellow("index1");
|
||||
|
||||
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
|
||||
final int firstBatchNumDocs = randomIntBetween(2, 64);
|
||||
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
|
||||
|
@ -204,7 +204,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
unfollowIndex("index2");
|
||||
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
|
||||
client().execute(ResumeFollowAction.INSTANCE, followRequest).get();
|
||||
final int secondBatchNumDocs = randomIntBetween(2, 64);
|
||||
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
|
||||
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
||||
|
@ -236,9 +236,9 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureYellow("index1");
|
||||
|
||||
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
|
||||
final long firstBatchNumDocs = randomIntBetween(2, 64);
|
||||
for (long i = 0; i < firstBatchNumDocs; i++) {
|
||||
|
@ -277,9 +277,9 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
.build()));
|
||||
ensureGreen("index1");
|
||||
|
||||
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
|
||||
client().prepareIndex("index1", "doc", "1").setSource("{\"f\":1}", XContentType.JSON).get();
|
||||
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
|
||||
|
@ -328,13 +328,13 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10));
|
||||
atLeastDocsIndexed("index1", numDocsIndexed / 3);
|
||||
|
||||
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
followRequest.setMaxBatchOperationCount(maxReadSize);
|
||||
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
|
||||
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
|
||||
followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240));
|
||||
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
|
||||
atLeastDocsIndexed("index1", numDocsIndexed);
|
||||
run.set(false);
|
||||
|
@ -370,11 +370,11 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
});
|
||||
thread.start();
|
||||
|
||||
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048));
|
||||
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
|
||||
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();
|
||||
client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest)).get();
|
||||
|
||||
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(),
|
||||
followRequest.getMaxBatchOperationCount() * 10));
|
||||
|
@ -399,8 +399,8 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
ensureGreen("index1");
|
||||
|
||||
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();
|
||||
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest)).get();
|
||||
|
||||
final int numDocs = randomIntBetween(2, 64);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
@ -434,31 +434,31 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testUnfollowNonExistingIndex() {
|
||||
UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
|
||||
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
|
||||
unfollowRequest.setFollowIndex("non-existing-index");
|
||||
expectThrows(IllegalArgumentException.class, () -> client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).actionGet());
|
||||
expectThrows(IllegalArgumentException.class, () -> client().execute(PauseFollowAction.INSTANCE, unfollowRequest).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowNonExistentIndex() throws Exception {
|
||||
assertAcked(client().admin().indices().prepareCreate("test-leader").get());
|
||||
assertAcked(client().admin().indices().prepareCreate("test-follower").get());
|
||||
// Leader index does not exist.
|
||||
FollowIndexAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet());
|
||||
ResumeFollowAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet());
|
||||
expectThrows(IndexNotFoundException.class,
|
||||
() -> client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest1))
|
||||
() -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest1))
|
||||
.actionGet());
|
||||
// Follower index does not exist.
|
||||
FollowIndexAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet());
|
||||
ResumeFollowAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest2).actionGet());
|
||||
expectThrows(IndexNotFoundException.class,
|
||||
() -> client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest2))
|
||||
() -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest2))
|
||||
.actionGet());
|
||||
// Both indices do not exist.
|
||||
FollowIndexAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet());
|
||||
ResumeFollowAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest3).actionGet());
|
||||
expectThrows(IndexNotFoundException.class,
|
||||
() -> client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest3))
|
||||
() -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest3))
|
||||
.actionGet());
|
||||
}
|
||||
|
||||
|
@ -475,10 +475,10 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||
}
|
||||
|
||||
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
followRequest.setMaxOperationSizeInBytes(1L);
|
||||
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
|
||||
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
|
||||
final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
|
||||
|
@ -505,22 +505,22 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("index3");
|
||||
|
||||
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
|
||||
followRequest = createFollowRequest("index3", "index4");
|
||||
createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
unfollowIndex("index2", "index4");
|
||||
|
||||
FollowIndexAction.Request wrongRequest1 = createFollowRequest("index1", "index4");
|
||||
ResumeFollowAction.Request wrongRequest1 = createFollowRequest("index1", "index4");
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet());
|
||||
() -> client().execute(ResumeFollowAction.INSTANCE, wrongRequest1).actionGet());
|
||||
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));
|
||||
|
||||
FollowIndexAction.Request wrongRequest2 = createFollowRequest("index3", "index2");
|
||||
e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet());
|
||||
ResumeFollowAction.Request wrongRequest2 = createFollowRequest("index3", "index2");
|
||||
e = expectThrows(IllegalArgumentException.class, () -> client().execute(ResumeFollowAction.INSTANCE, wrongRequest2).actionGet());
|
||||
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
|
||||
}
|
||||
|
||||
|
@ -528,9 +528,9 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
|
||||
ensureYellow("index1");
|
||||
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
|
||||
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
|
||||
ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||
PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
|
||||
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
|
||||
unfollowIndex("index2");
|
||||
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
|
||||
|
||||
|
@ -580,9 +580,9 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
|
||||
private void unfollowIndex(String... indices) throws Exception {
|
||||
for (String index : indices) {
|
||||
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
|
||||
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
|
||||
unfollowRequest.setFollowIndex(index);
|
||||
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
|
||||
client().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
|
||||
}
|
||||
assertBusy(() -> {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
|
@ -766,8 +766,8 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
|
||||
FollowIndexAction.Request request = new FollowIndexAction.Request();
|
||||
public static ResumeFollowAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
|
||||
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
|
||||
request.setLeaderIndex(leaderIndex);
|
||||
request.setFollowerIndex(followerIndex);
|
||||
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
|
|||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -92,7 +92,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers,
|
||||
FollowIndexAction.Request followRequest,
|
||||
ResumeFollowAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
|
||||
|
@ -150,7 +150,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers,
|
||||
FollowIndexAction.Request followRequest,
|
||||
ResumeFollowAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
fail("should not get here");
|
||||
|
@ -211,7 +211,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers,
|
||||
FollowIndexAction.Request followRequest,
|
||||
ResumeFollowAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
|
||||
|
@ -273,7 +273,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers,
|
||||
FollowIndexAction.Request followRequest,
|
||||
ResumeFollowAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
|
||||
public class CreateAndFollowIndexRequestTests extends AbstractStreamableTestCase<CreateAndFollowIndexAction.Request> {
|
||||
|
||||
@Override
|
||||
protected CreateAndFollowIndexAction.Request createBlankInstance() {
|
||||
return new CreateAndFollowIndexAction.Request();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CreateAndFollowIndexAction.Request createTestInstance() {
|
||||
return new CreateAndFollowIndexAction.Request(FollowIndexRequestTests.createTestRequest());
|
||||
}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
|
||||
public class CreateAndFollowIndexResponseTests extends AbstractStreamableTestCase<CreateAndFollowIndexAction.Response> {
|
||||
|
||||
@Override
|
||||
protected CreateAndFollowIndexAction.Response createBlankInstance() {
|
||||
return new CreateAndFollowIndexAction.Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CreateAndFollowIndexAction.Response createTestInstance() {
|
||||
return new CreateAndFollowIndexAction.Response(randomBoolean(), randomBoolean(), randomBoolean());
|
||||
}
|
||||
}
|
|
@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -17,21 +17,21 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class FollowIndexRequestTests extends AbstractStreamableXContentTestCase<FollowIndexAction.Request> {
|
||||
public class FollowIndexRequestTests extends AbstractStreamableXContentTestCase<ResumeFollowAction.Request> {
|
||||
|
||||
@Override
|
||||
protected FollowIndexAction.Request createBlankInstance() {
|
||||
return new FollowIndexAction.Request();
|
||||
protected ResumeFollowAction.Request createBlankInstance() {
|
||||
return new ResumeFollowAction.Request();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FollowIndexAction.Request createTestInstance() {
|
||||
protected ResumeFollowAction.Request createTestInstance() {
|
||||
return createTestRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FollowIndexAction.Request doParseInstance(XContentParser parser) throws IOException {
|
||||
return FollowIndexAction.Request.fromXContent(parser, null);
|
||||
protected ResumeFollowAction.Request doParseInstance(XContentParser parser) throws IOException {
|
||||
return ResumeFollowAction.Request.fromXContent(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -39,8 +39,8 @@ public class FollowIndexRequestTests extends AbstractStreamableXContentTestCase<
|
|||
return false;
|
||||
}
|
||||
|
||||
static FollowIndexAction.Request createTestRequest() {
|
||||
FollowIndexAction.Request request = new FollowIndexAction.Request();
|
||||
static ResumeFollowAction.Request createTestRequest() {
|
||||
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
|
||||
request.setLeaderIndex(randomAlphaOfLength(4));
|
||||
request.setFollowerIndex(randomAlphaOfLength(4));
|
||||
if (randomBoolean()) {
|
||||
|
@ -68,7 +68,7 @@ public class FollowIndexRequestTests extends AbstractStreamableXContentTestCase<
|
|||
}
|
||||
|
||||
public void testValidate() {
|
||||
FollowIndexAction.Request request = new FollowIndexAction.Request();
|
||||
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
|
||||
request.setLeaderIndex("index1");
|
||||
request.setFollowerIndex("index2");
|
||||
request.setMaxRetryDelay(TimeValue.ZERO);
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
|
||||
public class PutFollowActionRequestTests extends AbstractStreamableTestCase<PutFollowAction.Request> {
|
||||
|
||||
@Override
|
||||
protected PutFollowAction.Request createBlankInstance() {
|
||||
return new PutFollowAction.Request();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PutFollowAction.Request createTestInstance() {
|
||||
return new PutFollowAction.Request(FollowIndexRequestTests.createTestRequest());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
|
||||
public class PutFollowActionResponseTests extends AbstractStreamableTestCase<PutFollowAction.Response> {
|
||||
|
||||
@Override
|
||||
protected PutFollowAction.Response createBlankInstance() {
|
||||
return new PutFollowAction.Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PutFollowAction.Response createTestInstance() {
|
||||
return new PutFollowAction.Response(randomBoolean(), randomBoolean(), randomBoolean());
|
||||
}
|
||||
}
|
|
@ -80,7 +80,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
new ShardId("leader_index", "", 0),
|
||||
testRun.maxOperationCount,
|
||||
concurrency,
|
||||
TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
|
||||
TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
|
||||
concurrency,
|
||||
10240,
|
||||
TimeValue.timeValueMillis(10),
|
||||
|
|
|
@ -17,7 +17,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
import org.elasticsearch.xpack.ccr.ShardChangesIT;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -25,17 +25,17 @@ import java.util.Map;
|
|||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction.validate;
|
||||
import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.validate;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class TransportFollowIndexActionTests extends ESTestCase {
|
||||
public class TransportResumeFollowActionTests extends ESTestCase {
|
||||
|
||||
public void testValidation() throws IOException {
|
||||
final Map<String, String> customMetaData = new HashMap<>();
|
||||
customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "uuid");
|
||||
customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, "_na_");
|
||||
|
||||
FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2");
|
||||
ResumeFollowAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2");
|
||||
String[] UUIDs = new String[]{"uuid"};
|
||||
{
|
||||
// should fail, because leader index does not exist
|
|
@ -15,12 +15,12 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
public class UnfollowIndexAction extends Action<AcknowledgedResponse> {
|
||||
public class PauseFollowAction extends Action<AcknowledgedResponse> {
|
||||
|
||||
public static final UnfollowIndexAction INSTANCE = new UnfollowIndexAction();
|
||||
public static final String NAME = "cluster:admin/xpack/ccr/unfollow_index";
|
||||
public static final PauseFollowAction INSTANCE = new PauseFollowAction();
|
||||
public static final String NAME = "cluster:admin/xpack/ccr/pause_follow";
|
||||
|
||||
private UnfollowIndexAction() {
|
||||
private PauseFollowAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
|
@ -105,9 +105,9 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
|||
maxRetryDelay.getStringRep() + "]";
|
||||
validationException = addValidationError(message, validationException);
|
||||
}
|
||||
if (maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) {
|
||||
if (maxRetryDelay.millis() > ResumeFollowAction.MAX_RETRY_DELAY.millis()) {
|
||||
String message = "[" + AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName() + "] must be less than [" +
|
||||
FollowIndexAction.MAX_RETRY_DELAY +
|
||||
ResumeFollowAction.MAX_RETRY_DELAY +
|
||||
"] but was [" + maxRetryDelay.getStringRep() + "]";
|
||||
validationException = addValidationError(message, validationException);
|
||||
}
|
||||
|
|
|
@ -20,12 +20,12 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public final class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexAction.Response> {
|
||||
public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||
|
||||
public static final CreateAndFollowIndexAction INSTANCE = new CreateAndFollowIndexAction();
|
||||
public static final String NAME = "indices:admin/xpack/ccr/create_and_follow_index";
|
||||
public static final PutFollowAction INSTANCE = new PutFollowAction();
|
||||
public static final String NAME = "indices:admin/xpack/ccr/put_follow";
|
||||
|
||||
private CreateAndFollowIndexAction() {
|
||||
private PutFollowAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
|
@ -36,9 +36,9 @@ public final class CreateAndFollowIndexAction extends Action<CreateAndFollowInde
|
|||
|
||||
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest {
|
||||
|
||||
private FollowIndexAction.Request followRequest;
|
||||
private ResumeFollowAction.Request followRequest;
|
||||
|
||||
public Request(FollowIndexAction.Request followRequest) {
|
||||
public Request(ResumeFollowAction.Request followRequest) {
|
||||
this.followRequest = Objects.requireNonNull(followRequest);
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,7 @@ public final class CreateAndFollowIndexAction extends Action<CreateAndFollowInde
|
|||
|
||||
}
|
||||
|
||||
public FollowIndexAction.Request getFollowRequest() {
|
||||
public ResumeFollowAction.Request getFollowRequest() {
|
||||
return followRequest;
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,7 @@ public final class CreateAndFollowIndexAction extends Action<CreateAndFollowInde
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
followRequest = new FollowIndexAction.Request();
|
||||
followRequest = new ResumeFollowAction.Request();
|
||||
followRequest.readFrom(in);
|
||||
}
|
||||
|
|
@ -24,14 +24,14 @@ import java.util.Objects;
|
|||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public final class FollowIndexAction extends Action<AcknowledgedResponse> {
|
||||
public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
|
||||
|
||||
public static final FollowIndexAction INSTANCE = new FollowIndexAction();
|
||||
public static final String NAME = "cluster:admin/xpack/ccr/follow_index";
|
||||
public static final ResumeFollowAction INSTANCE = new ResumeFollowAction();
|
||||
public static final String NAME = "cluster:admin/xpack/ccr/resume_follow";
|
||||
|
||||
public static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5);
|
||||
|
||||
private FollowIndexAction() {
|
||||
private ResumeFollowAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
|
@ -210,7 +210,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
|
|||
maxRetryDelay.getStringRep() + "]";
|
||||
e = addValidationError(message, e);
|
||||
}
|
||||
if (maxRetryDelay != null && maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) {
|
||||
if (maxRetryDelay != null && maxRetryDelay.millis() > ResumeFollowAction.MAX_RETRY_DELAY.millis()) {
|
||||
String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be less than [" + MAX_RETRY_DELAY +
|
||||
"] but was [" + maxRetryDelay.getStringRep() + "]";
|
||||
e = addValidationError(message, e);
|
|
@ -13,12 +13,12 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -30,25 +30,25 @@ public class CcrClient {
|
|||
this.client = Objects.requireNonNull(client, "client");
|
||||
}
|
||||
|
||||
public void createAndFollow(
|
||||
final CreateAndFollowIndexAction.Request request,
|
||||
final ActionListener<CreateAndFollowIndexAction.Response> listener) {
|
||||
client.execute(CreateAndFollowIndexAction.INSTANCE, request, listener);
|
||||
public void putFollow(
|
||||
final PutFollowAction.Request request,
|
||||
final ActionListener<PutFollowAction.Response> listener) {
|
||||
client.execute(PutFollowAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
public ActionFuture<CreateAndFollowIndexAction.Response> createAndFollow(final CreateAndFollowIndexAction.Request request) {
|
||||
final PlainActionFuture<CreateAndFollowIndexAction.Response> listener = PlainActionFuture.newFuture();
|
||||
client.execute(CreateAndFollowIndexAction.INSTANCE, request, listener);
|
||||
public ActionFuture<PutFollowAction.Response> putFollow(final PutFollowAction.Request request) {
|
||||
final PlainActionFuture<PutFollowAction.Response> listener = PlainActionFuture.newFuture();
|
||||
client.execute(PutFollowAction.INSTANCE, request, listener);
|
||||
return listener;
|
||||
}
|
||||
|
||||
public void follow(final FollowIndexAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
|
||||
client.execute(FollowIndexAction.INSTANCE, request, listener);
|
||||
public void resumeFollow(final ResumeFollowAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
|
||||
client.execute(ResumeFollowAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
public ActionFuture<AcknowledgedResponse> follow(final FollowIndexAction.Request request) {
|
||||
public ActionFuture<AcknowledgedResponse> resumeFollow(final ResumeFollowAction.Request request) {
|
||||
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
|
||||
client.execute(FollowIndexAction.INSTANCE, request, listener);
|
||||
client.execute(ResumeFollowAction.INSTANCE, request, listener);
|
||||
return listener;
|
||||
}
|
||||
|
||||
|
@ -75,13 +75,13 @@ public class CcrClient {
|
|||
return listener;
|
||||
}
|
||||
|
||||
public void unfollow(final UnfollowIndexAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
|
||||
client.execute(UnfollowIndexAction.INSTANCE, request, listener);
|
||||
public void pauseFollow(final PauseFollowAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
|
||||
client.execute(PauseFollowAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
public ActionFuture<AcknowledgedResponse> unfollow(final UnfollowIndexAction.Request request) {
|
||||
public ActionFuture<AcknowledgedResponse> pauseFollow(final PauseFollowAction.Request request) {
|
||||
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
|
||||
client.execute(UnfollowIndexAction.INSTANCE, request, listener);
|
||||
client.execute(PauseFollowAction.INSTANCE, request, listener);
|
||||
return listener;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
|
|||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.security.support.Automatons;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -55,7 +56,7 @@ public final class IndexPrivilege extends Privilege {
|
|||
private static final Automaton VIEW_METADATA_AUTOMATON = patterns(GetAliasesAction.NAME, AliasesExistAction.NAME,
|
||||
GetIndexAction.NAME, IndicesExistsAction.NAME, GetFieldMappingsAction.NAME + "*", GetMappingsAction.NAME,
|
||||
ClusterSearchShardsAction.NAME, TypesExistsAction.NAME, ValidateQueryAction.NAME + "*", GetSettingsAction.NAME);
|
||||
private static final Automaton CREATE_FOLLOW_INDEX_AUTOMATON = patterns("indices:admin/xpack/ccr/create_and_follow_index");
|
||||
private static final Automaton CREATE_FOLLOW_INDEX_AUTOMATON = patterns(PutFollowAction.NAME);
|
||||
|
||||
public static final IndexPrivilege NONE = new IndexPrivilege("none", Automatons.EMPTY);
|
||||
public static final IndexPrivilege ALL = new IndexPrivilege("all", ALL_AUTOMATON);
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"ccr.follow_index": {
|
||||
"ccr.follow": {
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
|
||||
"methods": [ "POST" ],
|
||||
"methods": [ "PUT" ],
|
||||
"url": {
|
||||
"path": "/{index}/_ccr/follow",
|
||||
"paths": [ "/{index}/_ccr/follow" ],
|
||||
|
@ -9,7 +9,7 @@
|
|||
"index": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "The name of the follower index."
|
||||
"description": "The name of the follower index"
|
||||
}
|
||||
}
|
||||
},
|
|
@ -1,15 +1,15 @@
|
|||
{
|
||||
"ccr.unfollow_index": {
|
||||
"ccr.pause_follow": {
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/{index}/_ccr/unfollow",
|
||||
"paths": [ "/{index}/_ccr/unfollow" ],
|
||||
"path": "/{index}/_ccr/pause_follow",
|
||||
"paths": [ "/{index}/_ccr/pause_follow" ],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "The name of the follower index that should stop following its leader index."
|
||||
"description": "The name of the follower index that should pause following its leader index."
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,15 +1,15 @@
|
|||
{
|
||||
"ccr.create_and_follow_index": {
|
||||
"ccr.resume_follow": {
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/{index}/_ccr/create_and_follow",
|
||||
"paths": [ "/{index}/_ccr/create_and_follow" ],
|
||||
"path": "/{index}/_ccr/resume_follow",
|
||||
"paths": [ "/{index}/_ccr/resume_follow" ],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "The name of the follower index"
|
||||
"description": "The name of the follow index to resume following."
|
||||
}
|
||||
}
|
||||
},
|
Loading…
Reference in New Issue