[CCR] Move leader_index and leader_cluster parameters from resume follow to put follow api (#34638)

As part of this change the leader index name and leader cluster name are
stored in the CCR metadata in the follow index. The resume follow api
will read that when a resume follow request is executed.
This commit is contained in:
Martijn van Groningen 2018-10-23 19:37:45 +02:00 committed by GitHub
parent c447fc258a
commit ed817fb265
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 307 additions and 265 deletions

View File

@ -31,14 +31,6 @@ public class CcrMultiClusterLicenseIT extends ESRestTestCase {
return true;
}
public void testResumeFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}");
assertNonCompliantLicense(request);
}
}
public void testFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("PUT", "/follower/_ccr/follow");

View File

@ -80,7 +80,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
refresh(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, numDocs);
} else {
follow(allowedIndex, allowedIndex);
follow(client(), allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
@ -93,7 +93,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertThat(countCcrNodeTasks(), equalTo(0));
});
resumeFollow(allowedIndex, allowedIndex);
resumeFollow(allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
// Make sure that there are no other ccr relates operations running:
@ -106,11 +106,11 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex, allowedIndex));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));
// User does not have manage_follow_index index privilege for 'unallowedIndex':
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, unallowedIndex));
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, unallowedIndex));
assertThat(e.getMessage(),
containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
@ -119,7 +119,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
// User does have manage_follow_index index privilege on 'allowed' index,
// but not read / monitor roles on 'disallowed' index:
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, allowedIndex));
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, allowedIndex));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
@ -127,16 +127,20 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex, unallowedIndex));
follow(adminClient(), unallowedIndex, unallowedIndex);
pauseFollow(adminClient(), unallowedIndex);
e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
e = expectThrows(ResponseException.class,
() -> client().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/unfollow] is unauthorized for user [test_ccr]"));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_close")));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
}
}
@ -187,7 +191,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
assertOK(client().performRequest(request));
pauseFollow(allowedIndex);
pauseFollow(client(), allowedIndex);
}
private int countCcrNodeTasks() throws IOException {
@ -228,18 +232,17 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh")));
}
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
private static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}
private static void follow(String leaderIndex, String followIndex) throws IOException {
private static void follow(RestClient client, String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
assertOK(client.performRequest(request));
}
void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException {
@ -302,7 +305,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}
private static void pauseFollow(String followIndex) throws IOException {
private static void pauseFollow(RestClient client, String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

View File

@ -71,7 +71,7 @@ public class FollowIndexIT extends ESRestTestCase {
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
// unfollow and then follow and then index a few docs in leader index:
pauseFollow(followIndexName);
resumeFollow(leaderIndexName, followIndexName);
resumeFollow(followIndexName);
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
@ -84,14 +84,14 @@ public class FollowIndexIT extends ESRestTestCase {
pauseFollow(followIndexName);
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_ccr/unfollow")));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(leaderIndexName, followIndexName));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(followIndexName));
assertThat(e.getMessage(), containsString("follow index [" + followIndexName + "] does not have ccr metadata"));
}
}
public void testFollowNonExistingLeaderIndex() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index", "non-existing-index"));
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index"));
assertThat(e.getMessage(), containsString("no such index"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
@ -151,10 +151,9 @@ public class FollowIndexIT extends ESRestTestCase {
assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh")));
}
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
private static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

View File

@ -52,9 +52,7 @@
- do:
ccr.resume_follow:
index: bar
body:
leader_cluster: local
leader_index: foo
body: {}
- is_true: acknowledged
- do:

View File

@ -97,6 +97,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
public static final String CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY = "leader_cluster_name";
private final boolean enabled;
private final Settings settings;

View File

@ -176,11 +176,10 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
@Override
void createAndFollow(Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request request,
Runnable successHandler,
Consumer<Exception> failureHandler) {
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
PutFollowAction.Request request = new PutFollowAction.Request(followRequest);
followerClient.execute(
PutFollowAction.INSTANCE,
request,
@ -278,7 +277,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
}
private void checkAutoFollowPattern(String autoFollowPattenName,
String clusterAlias,
String leaderCluster,
AutoFollowPattern autoFollowPattern,
List<Index> leaderIndicesToFollow,
Map<String, String> headers,
@ -302,7 +301,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
} else {
followLeaderIndex(autoFollowPattenName, clusterAlias, indexToFollow, autoFollowPattern, headers, error -> {
followLeaderIndex(autoFollowPattenName, leaderCluster, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
@ -314,7 +313,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
}
private void followLeaderIndex(String autoFollowPattenName,
String clusterAlias,
String leaderCluster,
Index indexToFollow,
AutoFollowPattern pattern,
Map<String,String> headers,
@ -322,17 +321,20 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster(clusterAlias);
ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
followRequest.setMaxBatchSize(pattern.getMaxBatchSize());
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setPollTimeout(pattern.getPollTimeout());
PutFollowAction.Request request = new PutFollowAction.Request();
request.setLeaderCluster(leaderCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
request.setMaxBatchSize(pattern.getMaxBatchSize());
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
request.setPollTimeout(pattern.getPollTimeout());
request.setFollowRequest(followRequest);
// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
@ -418,7 +420,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
abstract void createAndFollow(
Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);

View File

@ -95,11 +95,11 @@ public final class TransportPutFollowAction
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}
String leaderCluster = request.getFollowRequest().getLeaderCluster();
String leaderCluster = request.getLeaderCluster();
// Validates whether the leader cluster has been configured properly:
client.getRemoteClusterClient(leaderCluster);
String leaderIndex = request.getFollowRequest().getLeaderIndex();
String leaderIndex = request.getLeaderIndex();
createFollowerIndexAndFollowRemoteIndex(request, leaderCluster, leaderIndex, listener);
}
@ -122,8 +122,7 @@ public final class TransportPutFollowAction
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
if (leaderIndexMetaData == null) {
listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() +
"] does not exist"));
listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not exist"));
return;
}
@ -160,6 +159,8 @@ public final class TransportPutFollowAction
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY, request.getLeaderCluster());
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
// Copy all settings, but overwrite a few settings.

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog;
@ -97,33 +98,34 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}
final String clusterAlias = request.getLeaderCluster();
// Validates whether the leader cluster has been configured properly:
client.getRemoteClusterClient(clusterAlias);
final String leaderIndex = request.getLeaderIndex();
followRemoteIndex(request, clusterAlias, leaderIndex, listener);
}
private void followRemoteIndex(
final ResumeFollowAction.Request request,
final String clusterAlias,
final String leaderIndex,
final ActionListener<AcknowledgedResponse> listener) {
final ClusterState state = clusterService.state();
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
if (followerIndexMetadata == null) {
listener.onFailure(new IndexNotFoundException(request.getFollowerIndex()));
return;
}
final Map<String, String> ccrMetadata = followerIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
if (ccrMetadata == null) {
throw new IllegalArgumentException("follow index ["+ request.getFollowerIndex() + "] does not have ccr metadata");
}
final String leaderCluster = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY);
// Validates whether the leader cluster has been configured properly:
client.getRemoteClusterClient(leaderCluster);
final String leaderIndex = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client,
clusterAlias,
leaderIndex,
listener::onFailure,
(leaderHistoryUUID, leaderIndexMetadata) -> {
try {
start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener);
} catch (final IOException e) {
listener.onFailure(e);
}
});
client,
leaderCluster,
leaderIndex,
listener::onFailure,
(leaderHistoryUUID, leaderIndexMetadata) -> {
try {
start(request, leaderCluster, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener);
} catch (final IOException e) {
listener.onFailure(e);
}
});
}
/**
@ -207,13 +209,6 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
final IndexMetaData followIndex,
final String[] leaderIndexHistoryUUID,
final MapperService followerMapperService) {
String leaderIndexName = request.getLeaderCluster() + ":" + request.getLeaderIndex();
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + leaderIndexName + "] does not exist");
}
if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] does not exist");
}
Map<String, String> ccrIndexMetadata = followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
if (ccrIndexMetadata == null) {
throw new IllegalArgumentException("follow index ["+ followIndex.getIndex().getName() + "] does not have ccr metadata");
@ -238,7 +233,8 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
}
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + leaderIndexName + "] does not have soft deletes enabled");
throw new IllegalArgumentException("leader index [" + leaderIndex.getIndex().getName() +
"] does not have soft deletes enabled");
}
if (followIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("follower index [" + request.getFollowerIndex() + "] does not have soft deletes enabled");

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -31,7 +32,13 @@ public class RestPutFollowAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request(RestResumeFollowAction.createRequest(restRequest));
Request request = createRequest(restRequest);
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
}
static Request createRequest(RestRequest restRequest) throws IOException {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
return Request.fromXContent(parser, restRequest.param("index"));
}
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.junit.After;
import org.junit.Before;
@ -63,14 +64,20 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
protected ResumeFollowAction.Request getFollowRequest() {
protected ResumeFollowAction.Request getResumeFollowRequest() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster("local");
request.setLeaderIndex("leader");
request.setFollowerIndex("follower");
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));
return request;
}
protected PutFollowAction.Request getPutFollowRequest() {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setLeaderCluster("local");
request.setLeaderIndex("leader");
request.setFollowRequest(getResumeFollowRequest());
return request;
}
}

View File

@ -49,7 +49,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
}
public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
final ResumeFollowAction.Request followRequest = getFollowRequest();
final ResumeFollowAction.Request followRequest = getResumeFollowRequest();
final CountDownLatch latch = new CountDownLatch(1);
client().execute(
ResumeFollowAction.INSTANCE,
@ -71,8 +71,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
}
public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
final ResumeFollowAction.Request followRequest = getFollowRequest();
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
final PutFollowAction.Request createAndFollowRequest = getPutFollowRequest();
final CountDownLatch latch = new CountDownLatch(1);
client().execute(
PutFollowAction.INSTANCE,

View File

@ -82,7 +82,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -100,7 +99,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final int firstBatchNumDocs = randomIntBetween(2, 64);
@ -162,7 +161,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final long firstBatchNumDocs = randomIntBetween(2, 64);
@ -202,7 +201,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
.build()));
ensureLeaderGreen("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
leaderClient().prepareIndex("index1", "doc", "1").setSource("{\"f\":1}", XContentType.JSON).get();
@ -252,7 +251,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10));
atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed / 3);
PutFollowAction.Request followRequest = follow("index1", "index2");
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxBatchOperationCount(maxReadSize);
followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
@ -295,7 +294,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
});
thread.start();
PutFollowAction.Request followRequest = follow("index1", "index2");
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxBatchOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
@ -323,7 +322,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final int numDocs = randomIntBetween(2, 64);
@ -372,22 +371,17 @@ public class IndexFollowingIT extends CcrIntegTestCase {
ensureLeaderGreen("test-leader");
ensureFollowerGreen("test-follower");
// Leader index does not exist.
ResumeFollowAction.Request followRequest1 = resumeFollow("non-existent-leader", "test-follower");
expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet());
expectThrows(IndexNotFoundException.class,
() -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest1))
() -> followerClient().execute(PutFollowAction.INSTANCE, putFollow("non-existent-leader", "test-follower"))
.actionGet());
// Follower index does not exist.
ResumeFollowAction.Request followRequest2 = resumeFollow("non-test-leader", "non-existent-follower");
ResumeFollowAction.Request followRequest1 = resumeFollow("non-existent-follower");
expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet());
// Both indices do not exist.
ResumeFollowAction.Request followRequest2 = resumeFollow("non-existent-follower");
expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest2).actionGet());
expectThrows(IndexNotFoundException.class,
() -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest2))
.actionGet());
// Both indices do not exist.
ResumeFollowAction.Request followRequest3 = resumeFollow("non-existent-leader", "non-existent-follower");
expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest3).actionGet());
expectThrows(IndexNotFoundException.class,
() -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest3))
() -> followerClient().execute(PutFollowAction.INSTANCE, putFollow("non-existing-leader", "non-existing-follower"))
.actionGet());
}
@ -404,7 +398,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
PutFollowAction.Request followRequest = follow("index1", "index2");
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxBatchSize(new ByteSizeValue(1, ByteSizeUnit.BYTES));
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
@ -427,37 +421,11 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs);
}
public void testDontFollowTheWrongIndex() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index3");
PutFollowAction.Request followRequest = follow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
followRequest = follow("index3", "index4");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
pauseFollow("index2", "index4");
ResumeFollowAction.Request wrongRequest1 = resumeFollow("index1", "index4");
Exception e = expectThrows(IllegalArgumentException.class,
() -> followerClient().execute(ResumeFollowAction.INSTANCE, wrongRequest1).actionGet());
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));
ResumeFollowAction.Request wrongRequest2 = resumeFollow("index3", "index2");
e = expectThrows(IllegalArgumentException.class,
() -> followerClient().execute(ResumeFollowAction.INSTANCE, wrongRequest2).actionGet());
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
}
public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow("index1");
PutFollowAction.Request followRequest = follow("index1", "index2");
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
pauseFollow("index2");
followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
@ -478,7 +446,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
@ -512,7 +480,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
@ -541,7 +509,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
@ -570,7 +538,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
@ -595,7 +563,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
public void testUnfollowIndex() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
PutFollowAction.Request followRequest = follow("index1", "index2");
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
@ -647,7 +615,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
});
threads[i].start();
}
PutFollowAction.Request follow = follow("leader-index", "follower-index");
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", between(20, 60));
@ -674,14 +642,11 @@ public class IndexFollowingIT extends CcrIntegTestCase {
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
PutFollowAction.Request followRequest = follow("index1", "index2");
followRequest.getFollowRequest().setLeaderCluster("another_cluster");
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.setLeaderCluster("another_cluster");
Exception e = expectThrows(IllegalArgumentException.class,
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest).actionGet());
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
e = expectThrows(IllegalArgumentException.class,
() -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).actionGet());
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
PutAutoFollowPatternAction.Request putAutoFollowRequest = new PutAutoFollowPatternAction.Request();
putAutoFollowRequest.setName("name");
putAutoFollowRequest.setLeaderCluster("another_cluster");
@ -696,7 +661,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
PutFollowAction.Request follow = follow("leader-index", "follower-index");
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3));
ensureFollowerGreen("follower-index");
@ -998,14 +963,16 @@ public class IndexFollowingIT extends CcrIntegTestCase {
});
}
public static PutFollowAction.Request follow(String leaderIndex, String followerIndex) {
return new PutFollowAction.Request(resumeFollow(leaderIndex, followerIndex));
}
public static ResumeFollowAction.Request resumeFollow(String leaderIndex, String followerIndex) {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setLeaderCluster("leader_cluster");
request.setLeaderIndex(leaderIndex);
request.setFollowRequest(resumeFollow(followerIndex));
return request;
}
public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));

View File

@ -31,7 +31,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader");
final PutFollowAction.Request followRequest = new PutFollowAction.Request(getFollowRequest());
final PutFollowAction.Request followRequest = getPutFollowRequest();
client().execute(PutFollowAction.INSTANCE, followRequest).get();
final long firstBatchNumDocs = randomIntBetween(2, 64);
@ -61,7 +61,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
}
client().execute(ResumeFollowAction.INSTANCE, getFollowRequest()).get();
client().execute(ResumeFollowAction.INSTANCE, getResumeFollowRequest()).get();
assertBusy(() -> {
assertThat(client().prepareSearch("follower").get().getHits().totalHits,
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));

View File

@ -21,7 +21,7 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -91,13 +91,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
@Override
void createAndFollow(Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
assertThat(followRequest.getLeaderCluster(), equalTo("remote"));
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
successHandler.run();
}
@ -150,7 +150,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
@Override
void createAndFollow(Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
fail("should not get here");
@ -211,12 +211,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
@Override
void createAndFollow(Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
assertThat(followRequest.getLeaderCluster(), equalTo("remote"));
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
successHandler.run();
}
@ -274,12 +274,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
@Override
void createAndFollow(Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
assertThat(followRequest.getLeaderCluster(), equalTo("remote"));
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
failureHandler.accept(failure);
}

View File

@ -5,10 +5,13 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
public class PutFollowActionRequestTests extends AbstractStreamableTestCase<PutFollowAction.Request> {
import java.io.IOException;
public class PutFollowActionRequestTests extends AbstractStreamableXContentTestCase<PutFollowAction.Request> {
@Override
protected PutFollowAction.Request createBlankInstance() {
@ -17,6 +20,20 @@ public class PutFollowActionRequestTests extends AbstractStreamableTestCase<PutF
@Override
protected PutFollowAction.Request createTestInstance() {
return new PutFollowAction.Request(ResumeFollowActionRequestTests.createTestRequest());
PutFollowAction.Request request = new PutFollowAction.Request();
request.setLeaderCluster(randomAlphaOfLength(4));
request.setLeaderIndex(randomAlphaOfLength(4));
request.setFollowRequest(ResumeFollowActionRequestTests.createTestRequest());
return request;
}
@Override
protected PutFollowAction.Request doParseInstance(XContentParser parser) throws IOException {
return PutFollowAction.Request.fromXContent(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
}

View File

@ -43,8 +43,6 @@ public class ResumeFollowActionRequestTests extends AbstractStreamableXContentTe
static ResumeFollowAction.Request createTestRequest() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster(randomAlphaOfLength(4));
request.setLeaderIndex(randomAlphaOfLength(4));
request.setFollowerIndex(randomAlphaOfLength(4));
if (randomBoolean()) {
request.setMaxBatchOperationCount(randomIntBetween(1, Integer.MAX_VALUE));
@ -72,8 +70,6 @@ public class ResumeFollowActionRequestTests extends AbstractStreamableXContentTe
public void testValidate() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster("leader_cluster");
request.setLeaderIndex("index1");
request.setFollowerIndex("index2");
request.setMaxRetryDelay(TimeValue.ZERO);

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.validate;
import static org.hamcrest.Matchers.equalTo;
@ -35,20 +34,8 @@ public class TransportResumeFollowActionTests extends ESTestCase {
customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "uuid");
customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, "_na_");
ResumeFollowAction.Request request = IndexFollowingIT.resumeFollow("index1", "index2");
ResumeFollowAction.Request request = IndexFollowingIT.resumeFollow("index2");
String[] UUIDs = new String[]{"uuid"};
{
// should fail, because leader index does not exist
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, null, null, null, null));
assertThat(e.getMessage(), equalTo("leader index [leader_cluster:index1] does not exist"));
}
{
// should fail, because follow index does not exist
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap());
Exception e = expectThrows(IllegalArgumentException.class,
() -> validate(request, leaderIMD, null, null, null));
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, null);
IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, null);
@ -83,7 +70,7 @@ public class TransportResumeFollowActionTests extends ESTestCase {
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, null);
IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("leader index [leader_cluster:index1] does not have soft deletes enabled"));
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
}
{
// should fail because the follower index does not have soft deletes enabled

View File

@ -12,14 +12,29 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.FOLLOWER_INDEX_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_BATCH_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_BATCH_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_READ_BATCHES;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_WRITE_BATCHES;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.POLL_TIMEOUT;
public final class PutFollowAction extends Action<PutFollowAction.Response> {
public static final PutFollowAction INSTANCE = new PutFollowAction();
@ -34,25 +49,97 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest {
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest, ToXContentObject {
private ResumeFollowAction.Request followRequest;
private static final ParseField LEADER_CLUSTER_FIELD = new ParseField("leader_cluster");
private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
public Request(ResumeFollowAction.Request followRequest) {
this.followRequest = Objects.requireNonNull(followRequest);
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, () -> {
Request request = new Request();
request.setFollowRequest(new ResumeFollowAction.Request());
return request;
});
static {
PARSER.declareString(Request::setLeaderCluster, LEADER_CLUSTER_FIELD);
PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD);
PARSER.declareString((request, value) -> request.followRequest.setFollowerIndex(value), FOLLOWER_INDEX_FIELD);
PARSER.declareInt((request, value) -> request.followRequest.setMaxBatchOperationCount(value), MAX_BATCH_OPERATION_COUNT);
PARSER.declareInt((request, value) -> request.followRequest.setMaxConcurrentReadBatches(value), MAX_CONCURRENT_READ_BATCHES);
PARSER.declareField(
(request, value) -> request.followRequest.setMaxBatchSize(value),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_BATCH_SIZE.getPreferredName()),
MAX_BATCH_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt((request, value) -> request.followRequest.setMaxConcurrentWriteBatches(value), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferSize(value), MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(
(request, value) -> request.followRequest.setMaxRetryDelay(value),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
MAX_RETRY_DELAY_FIELD,
ObjectParser.ValueType.STRING);
PARSER.declareField(
(request, value) -> request.followRequest.setPollTimeout(value),
(p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
POLL_TIMEOUT,
ObjectParser.ValueType.STRING);
}
public Request() {
public static Request fromXContent(final XContentParser parser, final String followerIndex) throws IOException {
Request request = PARSER.parse(parser, followerIndex);
if (followerIndex != null) {
if (request.getFollowRequest().getFollowerIndex() == null) {
request.getFollowRequest().setFollowerIndex(followerIndex);
} else {
if (request.getFollowRequest().getFollowerIndex().equals(followerIndex) == false) {
throw new IllegalArgumentException("provided follower_index is not equal");
}
}
}
return request;
}
private String leaderCluster;
private String leaderIndex;
private ResumeFollowAction.Request followRequest;
public Request() {
}
public String getLeaderCluster() {
return leaderCluster;
}
public void setLeaderCluster(String leaderCluster) {
this.leaderCluster = leaderCluster;
}
public String getLeaderIndex() {
return leaderIndex;
}
public void setLeaderIndex(String leaderIndex) {
this.leaderIndex = leaderIndex;
}
public ResumeFollowAction.Request getFollowRequest() {
return followRequest;
}
public void setFollowRequest(ResumeFollowAction.Request followRequest) {
this.followRequest = followRequest;
}
@Override
public ActionRequestValidationException validate() {
return followRequest.validate();
ActionRequestValidationException e = followRequest.validate();
if (leaderCluster == null) {
e = addValidationError(LEADER_CLUSTER_FIELD.getPreferredName() + " is missing", e);
}
if (leaderIndex == null) {
e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e);
}
return e;
}
@Override
@ -68,6 +155,8 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderCluster = in.readString();
leaderIndex = in.readString();
followRequest = new ResumeFollowAction.Request();
followRequest.readFrom(in);
}
@ -75,20 +164,36 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderCluster);
out.writeString(leaderIndex);
followRequest.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
followRequest.toXContentFragment(builder, params);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(followRequest, request.followRequest);
return Objects.equals(leaderCluster, request.leaderCluster) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followRequest, request.followRequest);
}
@Override
public int hashCode() {
return Objects.hash(followRequest);
return Objects.hash(leaderCluster, leaderIndex, followRequest);
}
}

View File

@ -43,21 +43,17 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
public static class Request extends ActionRequest implements ToXContentObject {
private static final ParseField LEADER_CLUSTER_FIELD = new ParseField("leader_cluster");
private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
private static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
private static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count");
private static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches");
private static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size");
private static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
private static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, Request::new);
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count");
static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches");
static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size");
static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(Request::setLeaderCluster, LEADER_CLUSTER_FIELD);
PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD);
PARSER.declareString(Request::setFollowerIndex, FOLLOWER_INDEX_FIELD);
PARSER.declareInt(Request::setMaxBatchOperationCount, MAX_BATCH_OPERATION_COUNT);
PARSER.declareInt(Request::setMaxConcurrentReadBatches, MAX_CONCURRENT_READ_BATCHES);
@ -94,26 +90,6 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
return request;
}
private String leaderCluster;
public String getLeaderCluster() {
return leaderCluster;
}
public void setLeaderCluster(String leaderCluster) {
this.leaderCluster = leaderCluster;
}
private String leaderIndex;
public String getLeaderIndex() {
return leaderIndex;
}
public void setLeaderIndex(String leaderIndex) {
this.leaderIndex = leaderIndex;
}
private String followerIndex;
public String getFollowerIndex() {
@ -201,12 +177,6 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
public ActionRequestValidationException validate() {
ActionRequestValidationException e = null;
if (leaderCluster == null) {
e = addValidationError(LEADER_CLUSTER_FIELD.getPreferredName() + " is missing", e);
}
if (leaderIndex == null) {
e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e);
}
if (followerIndex == null) {
e = addValidationError(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing", e);
}
@ -242,8 +212,6 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
leaderCluster = in.readString();
leaderIndex = in.readString();
followerIndex = in.readString();
maxBatchOperationCount = in.readOptionalVInt();
maxConcurrentReadBatches = in.readOptionalVInt();
@ -257,8 +225,6 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderCluster);
out.writeString(leaderIndex);
out.writeString(followerIndex);
out.writeOptionalVInt(maxBatchOperationCount);
out.writeOptionalVInt(maxConcurrentReadBatches);
@ -273,35 +239,37 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
if (maxBatchOperationCount != null) {
builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
}
if (maxBatchSize != null) {
builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep());
}
if (maxWriteBufferSize != null) {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
}
if (maxConcurrentReadBatches != null) {
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
}
if (maxConcurrentWriteBatches != null) {
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
}
if (maxRetryDelay != null) {
builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep());
}
if (pollTimeout != null) {
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
}
toXContentFragment(builder, params);
}
builder.endObject();
return builder;
}
void toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
if (maxBatchOperationCount != null) {
builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
}
if (maxBatchSize != null) {
builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep());
}
if (maxWriteBufferSize != null) {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
}
if (maxConcurrentReadBatches != null) {
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
}
if (maxConcurrentWriteBatches != null) {
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
}
if (maxRetryDelay != null) {
builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep());
}
if (pollTimeout != null) {
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
}
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
@ -314,16 +282,12 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(pollTimeout, request.pollTimeout) &&
Objects.equals(leaderCluster, request.leaderCluster) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followerIndex, request.followerIndex);
}
@Override
public int hashCode() {
return Objects.hash(
leaderCluster,
leaderIndex,
followerIndex,
maxBatchOperationCount,
maxConcurrentReadBatches,