diff --git a/plugin/src/test/java/org/elasticsearch/xdcr/ShardChangesTests.java b/plugin/src/test/java/org/elasticsearch/xdcr/ShardChangesTests.java new file mode 100644 index 00000000000..c64a0951598 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xdcr/ShardChangesTests.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xdcr; + +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.XPackSingleNodeTestCase; +import org.elasticsearch.xpack.xdcr.action.ShardChangesAction; + +import java.util.Collection; + +import static org.hamcrest.Matchers.equalTo; + +public class ShardChangesTests extends XPackSingleNodeTestCase { + + @Override + protected Settings nodeSettings() { + Settings.Builder newSettings = Settings.builder(); + newSettings.put(super.nodeSettings()); + newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); + return newSettings.build(); + } + + @Override + protected Collection> getPlugins() { + return pluginList(XPackPlugin.class, ReindexPlugin.class); + } + + public void testReadOperations() throws Exception { + client().admin().indices().prepareCreate("index") + .setSettings(Settings.builder().put("index.number_of_shards", 1)) + .setSettings(Settings.builder().put("index.number_of_replicas", 0)) + .get(); + + client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get(); + + ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0]; + long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); + assertThat(globalCheckPoint, equalTo(2L)); + + ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request.setMinSeqNo(0L); + request.setMaxSeqNo(globalCheckPoint); + ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get(); + assertThat(response.getOperations().size(), equalTo(3)); + Translog.Index operation = (Translog.Index) response.getOperations().get(0); + assertThat(operation.seqNo(), equalTo(0L)); + assertThat(operation.id(), equalTo("1")); + + operation = (Translog.Index) response.getOperations().get(1); + assertThat(operation.seqNo(), equalTo(1L)); + assertThat(operation.id(), equalTo("2")); + + operation = (Translog.Index) response.getOperations().get(2); + assertThat(operation.seqNo(), equalTo(2L)); + assertThat(operation.id(), equalTo("3")); + + client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "4").setSource("{}", XContentType.JSON).get(); + client().prepareIndex("index", "doc", "5").setSource("{}", XContentType.JSON).get(); + + shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0]; + globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); + assertThat(globalCheckPoint, equalTo(5L)); + + request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request.setMinSeqNo(3L); + request.setMaxSeqNo(globalCheckPoint); + response = client().execute(ShardChangesAction.INSTANCE, request).get(); + assertThat(response.getOperations().size(), equalTo(3)); + operation = (Translog.Index) response.getOperations().get(0); + assertThat(operation.seqNo(), equalTo(3L)); + assertThat(operation.id(), equalTo("3")); + + operation = (Translog.Index) response.getOperations().get(1); + assertThat(operation.seqNo(), equalTo(4L)); + assertThat(operation.id(), equalTo("4")); + + operation = (Translog.Index) response.getOperations().get(2); + assertThat(operation.seqNo(), equalTo(5L)); + assertThat(operation.id(), equalTo("5")); + } + +}