diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b4d5d6f7b6d..5999cc11478 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -5,15 +5,22 @@ */ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.plugins.ActionPlugin.ActionHandler; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; +import java.util.Collections; import java.util.List; import java.util.Optional; +import static java.util.Collections.emptyList; import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING; import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING; @@ -22,8 +29,9 @@ import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTIN */ public final class Ccr { - @SuppressWarnings("unused,FieldCanBeLocal") private final boolean enabled; + private final boolean tribeNode; + private final boolean tribeNodeClient; /** * Construct an instance of the CCR container with the specified settings. @@ -32,6 +40,16 @@ public final class Ccr { */ public Ccr(final Settings settings) { this.enabled = CCR_ENABLED_SETTING.get(settings); + this.tribeNode = XPackPlugin.isTribeNode(settings); + this.tribeNodeClient = XPackPlugin.isTribeClientNode(settings); + } + + public List> getActions() { + if (!enabled || tribeNodeClient || tribeNode) { + return emptyList(); + } + + return Collections.singletonList(new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class)); } /** diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java new file mode 100644 index 00000000000..4421335d8f8 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -0,0 +1,262 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.SingleShardOperationRequestBuilder; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class ShardChangesAction extends Action { + + public static final ShardChangesAction INSTANCE = new ShardChangesAction(); + public static final String NAME = "cluster:admin/xpack/ccr/shard_changes"; + + private ShardChangesAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends SingleShardRequest { + + private long minSeqNo; + private long maxSeqNo; + private ShardId shardId; + + public Request(ShardId shardId) { + super(shardId.getIndexName()); + this.shardId = shardId; + } + + Request() { + } + + public ShardId getShard() { + return shardId; + } + + public long getMinSeqNo() { + return minSeqNo; + } + + public void setMinSeqNo(long minSeqNo) { + this.minSeqNo = minSeqNo; + } + + public long getMaxSeqNo() { + return maxSeqNo; + } + + public void setMaxSeqNo(long maxSeqNo) { + this.maxSeqNo = maxSeqNo; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (minSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + validationException = addValidationError("minSeqNo cannot be unassigned", validationException); + } + if (maxSeqNo < minSeqNo) { + validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo [" + + maxSeqNo + "]", validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + minSeqNo = in.readZLong(); + maxSeqNo = in.readZLong(); + shardId = ShardId.readShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(minSeqNo); + out.writeZLong(maxSeqNo); + shardId.writeTo(out); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return minSeqNo == request.minSeqNo && + maxSeqNo == request.maxSeqNo && + Objects.equals(shardId, request.shardId); + } + + @Override + public int hashCode() { + return Objects.hash(minSeqNo, maxSeqNo, shardId); + } + } + + public static class Response extends ActionResponse { + + private List operations; + + Response() { + } + + Response(List operations) { + this.operations = operations; + } + + public List getOperations() { + return operations; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + operations = Translog.readOperations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + Translog.writeOperations(out, operations); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(operations, response.operations); + } + + @Override + public int hashCode() { + return Objects.hash(operations); + } + } + + static class RequestBuilder extends SingleShardOperationRequestBuilder { + + RequestBuilder(ElasticsearchClient client, Action action) { + super(client, action, new Request()); + } + } + + public static class TransportAction extends TransportSingleShardAction { + + private final IndicesService indicesService; + + @Inject + public TransportAction(Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + IndicesService indicesService) { + super(settings, NAME, threadPool, clusterService, transportService, actionFilters, + indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT); // is management right tp? + this.indicesService = indicesService; + } + + @Override + protected Response shardOperation(Request request, ShardId shardId) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); + IndexShard indexShard = indexService.getShard(request.getShard().id()); + + List operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo); + return new Response(operations); + } + + @Override + protected boolean resolveIndex(Request request) { + return true; + } + + @Override + protected ShardsIterator shards(ClusterState state, InternalRequest request) { + return state.routingTable() + .index(request.concreteIndex()) + .shard(request.request().getShard().id()) + .activeInitializingShardsRandomIt(); + } + + @Override + protected Response newResponse() { + return new Response(); + } + + } + + static List getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo) throws IOException { + if (indexShard.state() != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); + } + + final List operations = new ArrayList<>(); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(indexShard.indexSettings(), maxSeqNo, minSeqNo); + try (Translog.Snapshot snapshot = indexShard.getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo)) { + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + if (op.seqNo() >= minSeqNo && op.seqNo() <= maxSeqNo) { + operations.add(op); + tracker.markSeqNoAsCompleted(op.seqNo()); + } + } + } + + if (tracker.getCheckpoint() == maxSeqNo) { + return operations; + } else { + String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo + + "] found, tracker checkpoint [" + tracker.getCheckpoint() + "]"; + throw new IllegalStateException(message); + } + } + +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java new file mode 100644 index 00000000000..7d7937007a8 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +/* + +/* + * ELASTICSEARCH CONFIDENTIAL + * __________________ + * + * [2017] Elasticsearch Incorporated. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of Elasticsearch Incorporated and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to Elasticsearch Incorporated + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * Dissemination of this information or reproduction of this material + * is strictly forbidden unless prior written permission is obtained + * from Elasticsearch Incorporated. + */ +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.analysis.common.CommonAnalysisPlugin; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ccr.action.ShardChangesAction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0) +public class ShardChangesIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder newSettings = Settings.builder(); + newSettings.put(super.nodeSettings(nodeOrdinal)); + newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); + return newSettings.build(); + } + + @Override + protected Collection> getMockPlugins() { + return Collections.singleton(TestSeedPlugin.class); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(XPackPlugin.class, CommonAnalysisPlugin.class); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + // Something like this will emulate what the xdrc persistent task will do for pulling + // the changes: + public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { + client().admin().indices().prepareCreate("index") + .setSettings(Settings.builder().put("index.number_of_shards", 1)) + .get(); + + client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get(); + + ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0]; + long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); + assertThat(globalCheckPoint, equalTo(2L)); + + ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request.setMinSeqNo(0L); + request.setMaxSeqNo(globalCheckPoint); + ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get(); + assertThat(response.getOperations().size(), equalTo(3)); + Translog.Index operation = (Translog.Index) response.getOperations().get(0); + assertThat(operation.seqNo(), equalTo(0L)); + assertThat(operation.id(), equalTo("1")); + + operation = (Translog.Index) response.getOperations().get(1); + assertThat(operation.seqNo(), equalTo(1L)); + assertThat(operation.id(), equalTo("2")); + + operation = (Translog.Index) response.getOperations().get(2); + assertThat(operation.seqNo(), equalTo(2L)); + assertThat(operation.id(), equalTo("3")); + + client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "4").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "5").setSource("{}", XContentType.JSON).get(); + + shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0]; + globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); + assertThat(globalCheckPoint, equalTo(5L)); + + request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request.setMinSeqNo(3L); + request.setMaxSeqNo(globalCheckPoint); + response = client().execute(ShardChangesAction.INSTANCE, request).get(); + assertThat(response.getOperations().size(), equalTo(3)); + operation = (Translog.Index) response.getOperations().get(0); + assertThat(operation.seqNo(), equalTo(3L)); + assertThat(operation.id(), equalTo("3")); + + operation = (Translog.Index) response.getOperations().get(1); + assertThat(operation.seqNo(), equalTo(4L)); + assertThat(operation.id(), equalTo("4")); + + operation = (Translog.Index) response.getOperations().get(2); + assertThat(operation.seqNo(), equalTo(5L)); + assertThat(operation.id(), equalTo("5")); + } + +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java new file mode 100644 index 00000000000..d3e915765b7 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.mockito.Mockito; + +import java.util.List; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class ShardChangesActionTests extends ESSingleNodeTestCase { + + public void testGetOperationsBetween() throws Exception { + IndexService indexService = createIndex("index", Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build()); + + final int numWrites = randomIntBetween(2, 2048); + for (int i = 0; i < numWrites; i++) { + client().prepareIndex("index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + } + + // A number of times, get operations within a range that exists: + int iters = randomIntBetween(8, 32); + IndexShard indexShard = indexService.getShard(0); + for (int iter = 0; iter < iters; iter++) { + int min = randomIntBetween(0, numWrites - 1); + int max = randomIntBetween(min, numWrites); + + int index = 0; + List operations = ShardChangesAction.getOperationsBetween(indexShard, min, max); + for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) { + Translog.Operation operation = operations.get(index++); + assertThat(operation.seqNo(), equalTo(expectedSeqNo)); + } + } + + // get operations for a range no operations exists: + Exception e = expectThrows(IllegalStateException.class, + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1)); + assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" + + (numWrites + 1) +"] found, tracker checkpoint [")); + + // get operations for a range some operations do not exist: + e = expectThrows(IllegalStateException.class, + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10)); + assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" + + (numWrites + 10) +"] found, tracker checkpoint [")); + } + + public void testGetOperationsBetweenWhenShardNotStarted() throws Exception { + IndexShard indexShard = Mockito.mock(IndexShard.class); + + ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING); + Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting); + expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1)); + } + +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java new file mode 100644 index 00000000000..7cd1326d518 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractStreamableTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.nullValue; + +public class ShardChangesRequestTests extends AbstractStreamableTestCase { + + @Override + protected ShardChangesAction.Request createTestInstance() { + ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0)); + request.setMaxSeqNo(randomNonNegativeLong()); + request.setMinSeqNo(randomNonNegativeLong()); + return request; + } + + @Override + protected ShardChangesAction.Request createBlankInstance() { + return new ShardChangesAction.Request(); + } + + public void testValidate() { + ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0)); + request.setMinSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO); + assertThat(request.validate().getMessage(), containsString("minSeqNo cannot be unassigned")); + + request.setMinSeqNo(4); + assertThat(request.validate().getMessage(), containsString("minSeqNo [4] cannot be larger than maxSeqNo [0]")); + + request.setMaxSeqNo(8); + assertThat(request.validate(), nullValue()); + } +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java new file mode 100644 index 00000000000..4e8d870a096 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.AbstractStreamableTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.nullValue; + +public class ShardChangesResponseTests extends AbstractStreamableTestCase { + + @Override + protected ShardChangesAction.Response createTestInstance() { + int numOps = randomInt(8); + List operations = new ArrayList<>(numOps); + for (int i = 0; i < numOps; i++) { + operations.add(new Translog.NoOp(i, 0, "test")); + } + return new ShardChangesAction.Response(operations); + } + + @Override + protected ShardChangesAction.Response createBlankInstance() { + return new ShardChangesAction.Response(); + } + +}