From 14cd8a6794825d3903dd9b54a67261f4f26bf692 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 18 Jun 2016 18:53:47 +0200 Subject: [PATCH] Introduce Replication unit tests using real shards (#18930) This commit introduce unit testing infrastructure to test replication operations using real index shards. This is infra is complementary to the full integration tests and unit testing of ReplicationOperation we already have. The new ESIndexLevelReplicationTestCase base makes it easier to test and simulate failure mode that require real shards and but do not need the full blow stack of a complete node. The commit also add a simple "nothing is wrong" test plus a test that checks we don't drop docs during the various stages of recovery. For now, only single doc indexing is supported but this can be easily extended in the future. --- .../replication/ReplicationOperation.java | 18 +- .../ESIndexLevelReplicationTestCase.java | 474 ++++++++++++++++++ .../IndexLevelReplicationTests.java | 31 ++ .../RecoveryDuringReplicationTests.java | 120 +++++ .../org/elasticsearch/test/ESTestCase.java | 7 +- 5 files changed, 640 insertions(+), 10 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java create mode 100644 core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java create mode 100644 core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 8442e705257..0d99c4bfcdf 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -78,11 +78,11 @@ public class ReplicationOperation< private final List shardReplicaFailures = Collections.synchronizedList(new ArrayList<>()); - ReplicationOperation(Request request, Primary primary, - ActionListener listener, - boolean executeOnReplicas, boolean checkWriteConsistency, - Replicas replicas, - Supplier clusterStateSupplier, ESLogger logger, String opType) { + public ReplicationOperation(Request request, Primary primary, + ActionListener listener, + boolean executeOnReplicas, boolean checkWriteConsistency, + Replicas replicas, + Supplier clusterStateSupplier, ESLogger logger, String opType) { this.checkWriteConsistency = checkWriteConsistency; this.executeOnReplicas = executeOnReplicas; this.replicasProxy = replicas; @@ -94,7 +94,7 @@ public class ReplicationOperation< this.opType = opType; } - void execute() throws Exception { + public void execute() throws Exception { final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null; final ShardRouting primaryRouting = primary.routingEntry(); final ShardId primaryId = primaryRouting.shardId(); @@ -294,7 +294,7 @@ public class ReplicationOperation< } - interface Primary< + public interface Primary< Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, PrimaryResultT extends PrimaryResult @@ -322,7 +322,7 @@ public class ReplicationOperation< } - interface Replicas> { + public interface Replicas> { /** * performs the the given request on the specified replica @@ -366,7 +366,7 @@ public class ReplicationOperation< } } - interface PrimaryResult> { + public interface PrimaryResult> { R replicaRequest(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java new file mode 100644 index 00000000000..7ccf0a3e642 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -0,0 +1,474 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.replication; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.index.TransportIndexAction; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MapperTestUtils; +import org.elasticsearch.index.cache.IndexCache; +import org.elasticsearch.index.cache.query.DisabledQueryCache; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryFailedException; +import org.elasticsearch.indices.recovery.RecoverySourceHandler; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.indices.recovery.RecoveryTargetService; +import org.elasticsearch.indices.recovery.StartRecoveryRequest; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +public class ESIndexLevelReplicationTestCase extends ESTestCase { + + private ThreadPool threadPool; + final private Index index = new Index("test", "uuid"); + final private ShardId shardId = new ShardId(index, 0); + final private Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); + protected final static RecoveryTargetService.RecoveryListener recoveryListener = new RecoveryTargetService.RecoveryListener() { + @Override + public void onRecoveryDone(RecoveryState state) { + + } + + @Override + public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { + fail(ExceptionsHelper.detailedMessage(e)); + } + }; + + + @TestLogging("index.shard:TRACE,index.replication:TRACE,indices.recovery:TRACE") + public void testIndexingDuringFileRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(randomInt(1))) { + shards.startAll(); + int docs = shards.indexDocs(randomInt(50)); + shards.flush(); + IndexShard replica = shards.addReplica(); + final CountDownLatch recoveryBlocked = new CountDownLatch(1); + final CountDownLatch releaseRecovery = new CountDownLatch(1); + final Future recoveryFuture = shards.asyncRecoverReplica(replica, new BiFunction() { + @Override + public RecoveryTarget apply(IndexShard indexShard, DiscoveryNode node) { + return new RecoveryTarget(indexShard, node, recoveryListener) { + @Override + public void renameAllTempFiles() throws IOException { + super.renameAllTempFiles(); + recoveryBlocked.countDown(); + try { + releaseRecovery.await(); + } catch (InterruptedException e) { + throw new IOException("terminated by interrupt", e); + } + } + }; + } + }); + + recoveryBlocked.await(); + docs += shards.indexDocs(randomInt(20)); + releaseRecovery.countDown(); + recoveryFuture.get(); + + shards.assertAllEqual(docs); + } + } + + + @Before + public void setup() { + threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void destroy() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { + final ShardId shardId = shardPath.getShardId(); + final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { + @Override + public Directory newDirectory() throws IOException { + return newFSDirectory(shardPath.resolveIndex()); + } + + @Override + public long throttleTimeInNanos() { + return 0; + } + }; + return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); + } + + protected ReplicationGroup createGroup(int replicas) throws IOException { + final Path homePath = createTempDir(); + Settings build = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder(index.getName()).settings(build).primaryTerm(0, 1).build(); + return new ReplicationGroup(metaData, homePath); + } + + private DiscoveryNode getDiscoveryNode(String id) { + return new DiscoveryNode(id, id, DummyTransportAddress.INSTANCE, Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); + } + + private IndexShard newShard(boolean primary, DiscoveryNode node, IndexMetaData indexMetaData, Path homePath) throws IOException { + // add node name to settings for propper logging + final Settings nodeSettings = Settings.builder().put("node.name", node.getName()).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); + ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, node.getId(), primary, ShardRoutingState.INITIALIZING); + final Path path = Files.createDirectories(homePath.resolve(node.getId())); + final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(path); + ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); + Store store = createStore(indexSettings, shardPath); + IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); + MapperService mapperService = MapperTestUtils.newMapperService(homePath, indexSettings.getSettings()); + for (Map.Entry type : indexMapping.entrySet()) { + mapperService.merge(type.getKey(), new CompressedXContent(type.getValue()), MapperService.MergeReason.MAPPING_RECOVERY, true); + } + SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap()); + final IndexEventListener indexEventListener = new IndexEventListener() { + }; + final Engine.Warmer warmer = searcher -> { + }; + return new IndexShard(shardRouting, indexSettings, shardPath, store, indexCache, mapperService, similarityService, null, null, + indexEventListener, null, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), + Collections.emptyList()); + } + + + class ReplicationGroup implements AutoCloseable, Iterable { + private final IndexShard primary; + private final List replicas; + private final IndexMetaData indexMetaData; + private final Path homePath; + private final AtomicInteger replicaId = new AtomicInteger(); + private final AtomicInteger docId = new AtomicInteger(); + boolean closed = false; + + ReplicationGroup(final IndexMetaData indexMetaData, Path homePath) throws IOException { + primary = newShard(true, getDiscoveryNode("s0"), indexMetaData, homePath); + replicas = new ArrayList<>(); + this.indexMetaData = indexMetaData; + this.homePath = homePath; + for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) { + addReplica(); + } + + } + + public int indexDocs(final int numOfDoc) throws Exception { + for (int doc = 0; doc < numOfDoc; doc++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet())) + .source("{}"); + final IndexResponse response = index(indexRequest); + assertThat(response.isCreated(), equalTo(true)); + } + return numOfDoc; + } + + public IndexResponse index(IndexRequest indexRequest) throws Exception { + PlainActionFuture listener = new PlainActionFuture<>(); + IndexingOp op = new IndexingOp(indexRequest, listener, this); + op.execute(); + return listener.get().finalResponse; + } + + public synchronized void startAll() throws IOException { + final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); + primary.markAsRecovering("store", new RecoveryState(primary.shardId(), true, RecoveryState.Type.STORE, pNode, pNode)); + primary.recoverFromStore(); + primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry())); + for (IndexShard replicaShard : replicas) { + recoverReplica(replicaShard, (replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener)); + } + } + + public synchronized IndexShard addReplica() throws IOException { + final IndexShard replica = newShard(false, getDiscoveryNode("s" + replicaId.incrementAndGet()), indexMetaData, homePath); + replicas.add(replica); + return replica; + } + + public void recoverReplica(IndexShard replica, BiFunction targetSupplier) + throws IOException { + final DiscoveryNode pNode; + synchronized (this) { + pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); + } + final DiscoveryNode rNode = getDiscoveryNode(replica.routingEntry().currentNodeId()); + replica.markAsRecovering("remote", new RecoveryState(replica.shardId(), false, RecoveryState.Type.REPLICA, pNode, rNode)); + replica.prepareForIndexRecovery(); + RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); + StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode, + replica.store().getMetadataOrEmpty(), RecoveryState.Type.REPLICA, 0); + RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, (int) ByteSizeUnit.MB.toKB(1), + logger); + recovery.recoverToTarget(); + recoveryTarget.markAsDone(); + replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry())); + } + + public Future asyncRecoverReplica(IndexShard replica, BiFunction targetSupplier) + throws IOException { + FutureTask task = new FutureTask<>(() -> { + recoverReplica(replica, targetSupplier); + return null; + }); + threadPool.generic().execute(task); + return task; + } + + public synchronized void assertAllEqual(int expectedCount) throws IOException { + Set primaryIds = getShardDocUIDs(primary); + assertThat(primaryIds.size(), equalTo(expectedCount)); + for (IndexShard replica : replicas) { + Set replicaIds = getShardDocUIDs(replica); + Set temp = new HashSet<>(primaryIds); + temp.removeAll(replicaIds); + assertThat(replica.routingEntry() + " is missing docs", temp, empty()); + temp = new HashSet<>(replicaIds); + temp.removeAll(primaryIds); + assertThat(replica.routingEntry() + " has extra docs", temp, empty()); + } + } + + private Set getShardDocUIDs(final IndexShard shard) throws IOException { + shard.refresh("get_uids"); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + Set ids = new HashSet<>(); + for (LeafReaderContext leafContext : searcher.reader().leaves()) { + LeafReader reader = leafContext.reader(); + Bits liveDocs = reader.getLiveDocs(); + for (int i = 0; i < reader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + Document uuid = reader.document(i, Collections.singleton(UidFieldMapper.NAME)); + ids.add(Uid.createUid(uuid.get(UidFieldMapper.NAME))); + } + } + } + return ids; + } + } + + public synchronized void refresh(String source) { + for (IndexShard shard : this) { + shard.refresh(source); + } + } + + public synchronized void flush() { + final FlushRequest request = new FlushRequest(); + for (IndexShard shard : this) { + shard.flush(request); + } + } + + public synchronized List shardRoutings() { + return StreamSupport.stream(this.spliterator(), false).map(IndexShard::routingEntry).collect(Collectors.toList()); + } + + @Override + public synchronized void close() throws Exception { + if (closed == false) { + closed = true; + for (IndexShard shard : this) { + shard.close("eol", false); + IOUtils.close(shard.store()); + } + } else { + throw new AlreadyClosedException("too bad"); + } + } + + @Override + public Iterator iterator() { + return Iterators.concat(replicas.iterator(), Collections.singleton(primary).iterator()); + } + } + + class IndexingOp extends ReplicationOperation { + + private final ReplicationGroup replicationGroup; + + public IndexingOp(IndexRequest request, ActionListener listener, ReplicationGroup replicationGroup) { + super(request, new PrimaryRef(replicationGroup), listener, true, false, new ReplicasRef(replicationGroup), + () -> null, logger, "indexing"); + this.replicationGroup = replicationGroup; + request.process(null, true, request.index()); + } + + @Override + protected List getShards(ShardId shardId, ClusterState state) { + return replicationGroup.shardRoutings(); + } + + } + + private static class PrimaryRef implements ReplicationOperation.Primary { + final IndexShard primary; + + private PrimaryRef(ReplicationGroup replicationGroup) { + this.primary = replicationGroup.primary; + } + + @Override + public ShardRouting routingEntry() { + return primary.routingEntry(); + } + + @Override + public void failShard(String message, Throwable throwable) { + throw new UnsupportedOperationException(); + } + + @Override + public IndexingResult perform(IndexRequest request) throws Exception { + TransportWriteAction.WriteResult result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary, + null); + request.primaryTerm(primary.getPrimaryTerm()); + return new IndexingResult(request, result.getResponse()); + } + + } + + private static class ReplicasRef implements ReplicationOperation.Replicas { + private final ReplicationGroup replicationGroup; + + private ReplicasRef(ReplicationGroup replicationGroup) { + this.replicationGroup = replicationGroup; + } + + @Override + public void performOn(ShardRouting replicaRouting, IndexRequest request, ActionListener listener) { + try { + IndexShard replica = replicationGroup.replicas.stream() + .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); + TransportIndexAction.executeIndexRequestOnReplica(request, replica); + listener.onResponse(TransportResponse.Empty.INSTANCE); + } catch (Throwable t) { + listener.onFailure(t); + } + } + + @Override + public void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + throw new UnsupportedOperationException(); + } + } + + + private static class IndexingResult implements ReplicationOperation.PrimaryResult { + final IndexRequest replicaRequest; + final IndexResponse finalResponse; + + public IndexingResult(IndexRequest replicaRequest, IndexResponse finalResponse) { + this.replicaRequest = replicaRequest; + this.finalResponse = finalResponse; + } + + @Override + public IndexRequest replicaRequest() { + return replicaRequest; + } + + @Override + public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { + finalResponse.setShardInfo(shardInfo); + } + + public void respond(ActionListener listener) { + listener.onResponse(finalResponse); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java new file mode 100644 index 00000000000..c6d7878406a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.replication; + +public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase { + + public void testSimpleReplication() throws Exception { + try (ReplicationGroup shards = createGroup(randomInt(2))) { + shards.startAll(); + final int docCount = randomInt(50); + shards.indexDocs(docCount); + shards.assertAllEqual(docCount); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java new file mode 100644 index 00000000000..815884edf51 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.replication; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.indices.recovery.RecoveryTargetService; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; + +public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase { + + public void testIndexingDuringFileRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(randomInt(1))) { + shards.startAll(); + int docs = shards.indexDocs(randomInt(50)); + shards.flush(); + IndexShard replica = shards.addReplica(); + final CountDownLatch recoveryBlocked = new CountDownLatch(1); + final CountDownLatch releaseRecovery = new CountDownLatch(1); + final RecoveryState.Stage blockOnStage = randomFrom(BlockingTarget.SUPPORTED_STAGES); + final Future recoveryFuture = shards.asyncRecoverReplica(replica, (indexShard, node) -> + new BlockingTarget(blockOnStage, recoveryBlocked, releaseRecovery, indexShard, node, recoveryListener, logger)); + + recoveryBlocked.await(); + docs += shards.indexDocs(randomInt(20)); + releaseRecovery.countDown(); + recoveryFuture.get(); + + shards.assertAllEqual(docs); + } + } + + private static class BlockingTarget extends RecoveryTarget { + private final CountDownLatch recoveryBlocked; + private final CountDownLatch releaseRecovery; + private final RecoveryState.Stage stageToBlock; + public static final EnumSet SUPPORTED_STAGES = + EnumSet.of(RecoveryState.Stage.INDEX, RecoveryState.Stage.TRANSLOG, RecoveryState.Stage.FINALIZE); + private final ESLogger logger; + + BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, + DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, ESLogger logger) { + super(shard, sourceNode, listener); + this.recoveryBlocked = recoveryBlocked; + this.releaseRecovery = releaseRecovery; + this.stageToBlock = stageToBlock; + this.logger = logger; + if (SUPPORTED_STAGES.contains(stageToBlock) == false) { + throw new UnsupportedOperationException(stageToBlock + " is not supported"); + } + } + + private boolean hasBlocked() { + return recoveryBlocked.getCount() == 0; + } + + private void blockIfNeeded(RecoveryState.Stage currentStage) { + if (currentStage == stageToBlock) { + logger.info("--> blocking recovery on stage [{}]", currentStage); + recoveryBlocked.countDown(); + try { + releaseRecovery.await(); + logger.info("--> recovery continues from stage [{}]", currentStage); + } catch (InterruptedException e) { + throw new RuntimeException("blockage released"); + } + } + } + + @Override + public void indexTranslogOperations(List operations, int totalTranslogOps) { + if (hasBlocked() == false) { + blockIfNeeded(RecoveryState.Stage.TRANSLOG); + } + super.indexTranslogOperations(operations, totalTranslogOps); + } + + @Override + public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { + blockIfNeeded(RecoveryState.Stage.INDEX); + super.cleanFiles(totalTranslogOps, sourceMetaData); + } + + @Override + public void finalizeRecovery() { + if (hasBlocked() == false) { + // it maybe that not ops have been transferred, block now + blockIfNeeded(RecoveryState.Stage.TRANSLOG); + } + blockIfNeeded(RecoveryState.Stage.FINALIZE); + super.finalizeRecovery(); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index e0ae77d3f56..172c99592df 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -36,7 +36,6 @@ import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TimeUnits; import org.elasticsearch.Version; import org.elasticsearch.bootstrap.BootstrapForTesting; -import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; @@ -47,6 +46,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; @@ -334,6 +334,11 @@ public abstract class ESTestCase extends LuceneTestCase { return RandomPicks.randomFrom(random(), list); } + /** Pick a random object from the given collection. */ + public static T randomFrom(Collection collection) { + return RandomPicks.randomFrom(random(), collection); + } + public static String randomAsciiOfLengthBetween(int minCodeUnits, int maxCodeUnits) { return RandomizedTest.randomAsciiOfLengthBetween(minCodeUnits, maxCodeUnits); }