[CCR] Add an internal api to read translog operations between a sequence number range (#2563)

* xdcr: Add an internal api to read translog operations between a sequence number range.

This api will be used later by the persistent task for the following index to pull data from the leader index.

The persistent task can fetch the global checkpoint from the shard stats for each primary shard of the leader index.
Based on the global checkpoint of the primary shards of the following index, the persistent task can send several
calls to the internal api added in this commit to replicate changes from follow index to leader index in a batched manner.
This commit is contained in:
Martijn van Groningen 2017-10-16 07:38:03 +02:00 committed by GitHub
parent 3033aba67b
commit 1f495f59a1
6 changed files with 568 additions and 1 deletions

View File

@ -5,15 +5,22 @@
*/ */
package org.elasticsearch.xpack.ccr; package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING; import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING;
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING; import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING;
@ -22,8 +29,9 @@ import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTIN
*/ */
public final class Ccr { public final class Ccr {
@SuppressWarnings("unused,FieldCanBeLocal")
private final boolean enabled; private final boolean enabled;
private final boolean tribeNode;
private final boolean tribeNodeClient;
/** /**
* Construct an instance of the CCR container with the specified settings. * Construct an instance of the CCR container with the specified settings.
@ -32,6 +40,16 @@ public final class Ccr {
*/ */
public Ccr(final Settings settings) { public Ccr(final Settings settings) {
this.enabled = CCR_ENABLED_SETTING.get(settings); this.enabled = CCR_ENABLED_SETTING.get(settings);
this.tribeNode = XPackPlugin.isTribeNode(settings);
this.tribeNodeClient = XPackPlugin.isTribeClientNode(settings);
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (!enabled || tribeNodeClient || tribeNode) {
return emptyList();
}
return Collections.singletonList(new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class));
} }
/** /**

View File

@ -0,0 +1,262 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequestBuilder;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class ShardChangesAction extends Action<ShardChangesAction.Request, ShardChangesAction.Response, ShardChangesAction.RequestBuilder> {
public static final ShardChangesAction INSTANCE = new ShardChangesAction();
public static final String NAME = "cluster:admin/xpack/ccr/shard_changes";
private ShardChangesAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends SingleShardRequest<Request> {
private long minSeqNo;
private long maxSeqNo;
private ShardId shardId;
public Request(ShardId shardId) {
super(shardId.getIndexName());
this.shardId = shardId;
}
Request() {
}
public ShardId getShard() {
return shardId;
}
public long getMinSeqNo() {
return minSeqNo;
}
public void setMinSeqNo(long minSeqNo) {
this.minSeqNo = minSeqNo;
}
public long getMaxSeqNo() {
return maxSeqNo;
}
public void setMaxSeqNo(long maxSeqNo) {
this.maxSeqNo = maxSeqNo;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (minSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException = addValidationError("minSeqNo cannot be unassigned", validationException);
}
if (maxSeqNo < minSeqNo) {
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo ["
+ maxSeqNo + "]", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minSeqNo = in.readZLong();
maxSeqNo = in.readZLong();
shardId = ShardId.readShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(minSeqNo);
out.writeZLong(maxSeqNo);
shardId.writeTo(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return minSeqNo == request.minSeqNo &&
maxSeqNo == request.maxSeqNo &&
Objects.equals(shardId, request.shardId);
}
@Override
public int hashCode() {
return Objects.hash(minSeqNo, maxSeqNo, shardId);
}
}
public static class Response extends ActionResponse {
private List<Translog.Operation> operations;
Response() {
}
Response(List<Translog.Operation> operations) {
this.operations = operations;
}
public List<Translog.Operation> getOperations() {
return operations;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operations = Translog.readOperations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Translog.writeOperations(out, operations);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(operations, response.operations);
}
@Override
public int hashCode() {
return Objects.hash(operations);
}
}
static class RequestBuilder extends SingleShardOperationRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
super(client, action, new Request());
}
}
public static class TransportAction extends TransportSingleShardAction<Request, Response> {
private final IndicesService indicesService;
@Inject
public TransportAction(Settings settings,
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT); // is management right tp?
this.indicesService = indicesService;
}
@Override
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
List<Translog.Operation> operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo);
return new Response(operations);
}
@Override
protected boolean resolveIndex(Request request) {
return true;
}
@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
return state.routingTable()
.index(request.concreteIndex())
.shard(request.request().getShard().id())
.activeInitializingShardsRandomIt();
}
@Override
protected Response newResponse() {
return new Response();
}
}
static List<Translog.Operation> getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
final List<Translog.Operation> operations = new ArrayList<>();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(indexShard.indexSettings(), maxSeqNo, minSeqNo);
try (Translog.Snapshot snapshot = indexShard.getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo)) {
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
if (op.seqNo() >= minSeqNo && op.seqNo() <= maxSeqNo) {
operations.add(op);
tracker.markSeqNoAsCompleted(op.seqNo());
}
}
}
if (tracker.getCheckpoint() == maxSeqNo) {
return operations;
} else {
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
"] found, tracker checkpoint [" + tracker.getCheckpoint() + "]";
throw new IllegalStateException(message);
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
/*
/*
* ELASTICSEARCH CONFIDENTIAL
* __________________
*
* [2017] Elasticsearch Incorporated. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of Elasticsearch Incorporated and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to Elasticsearch Incorporated
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from Elasticsearch Incorporated.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
public class ShardChangesIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder newSettings = Settings.builder();
newSettings.put(super.nodeSettings(nodeOrdinal));
newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
return newSettings.build();
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Collections.singleton(TestSeedPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(XPackPlugin.class, CommonAnalysisPlugin.class);
}
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
// Something like this will emulate what the xdrc persistent task will do for pulling
// the changes:
public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
client().admin().indices().prepareCreate("index")
.setSettings(Settings.builder().put("index.number_of_shards", 1))
.get();
client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(2L));
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId());
request.setMinSeqNo(0L);
request.setMaxSeqNo(globalCheckPoint);
ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().size(), equalTo(3));
Translog.Index operation = (Translog.Index) response.getOperations().get(0);
assertThat(operation.seqNo(), equalTo(0L));
assertThat(operation.id(), equalTo("1"));
operation = (Translog.Index) response.getOperations().get(1);
assertThat(operation.seqNo(), equalTo(1L));
assertThat(operation.id(), equalTo("2"));
operation = (Translog.Index) response.getOperations().get(2);
assertThat(operation.seqNo(), equalTo(2L));
assertThat(operation.id(), equalTo("3"));
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "4").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "5").setSource("{}", XContentType.JSON).get();
shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(5L));
request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId());
request.setMinSeqNo(3L);
request.setMaxSeqNo(globalCheckPoint);
response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().size(), equalTo(3));
operation = (Translog.Index) response.getOperations().get(0);
assertThat(operation.seqNo(), equalTo(3L));
assertThat(operation.id(), equalTo("3"));
operation = (Translog.Index) response.getOperations().get(1);
assertThat(operation.seqNo(), equalTo(4L));
assertThat(operation.id(), equalTo("4"));
operation = (Translog.Index) response.getOperations().get(2);
assertThat(operation.seqNo(), equalTo(5L));
assertThat(operation.id(), equalTo("5"));
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.mockito.Mockito;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class ShardChangesActionTests extends ESSingleNodeTestCase {
public void testGetOperationsBetween() throws Exception {
IndexService indexService = createIndex("index", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build());
final int numWrites = randomIntBetween(2, 2048);
for (int i = 0; i < numWrites; i++) {
client().prepareIndex("index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
}
// A number of times, get operations within a range that exists:
int iters = randomIntBetween(8, 32);
IndexShard indexShard = indexService.getShard(0);
for (int iter = 0; iter < iters; iter++) {
int min = randomIntBetween(0, numWrites - 1);
int max = randomIntBetween(min, numWrites);
int index = 0;
List<Translog.Operation> operations = ShardChangesAction.getOperationsBetween(indexShard, min, max);
for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) {
Translog.Operation operation = operations.get(index++);
assertThat(operation.seqNo(), equalTo(expectedSeqNo));
}
}
// get operations for a range no operations exists:
Exception e = expectThrows(IllegalStateException.class,
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" +
(numWrites + 1) +"] found, tracker checkpoint ["));
// get operations for a range some operations do not exist:
e = expectThrows(IllegalStateException.class,
() -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" +
(numWrites + 10) +"] found, tracker checkpoint ["));
}
public void testGetOperationsBetweenWhenShardNotStarted() throws Exception {
IndexShard indexShard = Mockito.mock(IndexShard.class);
ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING);
Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting);
expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1));
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractStreamableTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.nullValue;
public class ShardChangesRequestTests extends AbstractStreamableTestCase<ShardChangesAction.Request> {
@Override
protected ShardChangesAction.Request createTestInstance() {
ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0));
request.setMaxSeqNo(randomNonNegativeLong());
request.setMinSeqNo(randomNonNegativeLong());
return request;
}
@Override
protected ShardChangesAction.Request createBlankInstance() {
return new ShardChangesAction.Request();
}
public void testValidate() {
ShardChangesAction.Request request = new ShardChangesAction.Request(new ShardId("_index", "_indexUUID", 0));
request.setMinSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO);
assertThat(request.validate().getMessage(), containsString("minSeqNo cannot be unassigned"));
request.setMinSeqNo(4);
assertThat(request.validate().getMessage(), containsString("minSeqNo [4] cannot be larger than maxSeqNo [0]"));
request.setMaxSeqNo(8);
assertThat(request.validate(), nullValue());
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.nullValue;
public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardChangesAction.Response> {
@Override
protected ShardChangesAction.Response createTestInstance() {
int numOps = randomInt(8);
List<Translog.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
operations.add(new Translog.NoOp(i, 0, "test"));
}
return new ShardChangesAction.Response(operations);
}
@Override
protected ShardChangesAction.Response createBlankInstance() {
return new ShardChangesAction.Response();
}
}