diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index a69a0e800b5..57182ba48c4 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -1348,11 +1348,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L // note that if there was no cluster state update between start of the engine of this shard and the call to // initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort. runAfter.run(); - - if (indexSettings.isSoftDeleteEnabled()) { - addPeerRecoveryRetentionLeaseForSolePrimary(); - } - + addPeerRecoveryRetentionLeaseForSolePrimary(); assert invariant(); } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 37682c2ce5b..3734135d9c1 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -20,12 +20,14 @@ package org.elasticsearch.recovery; import com.carrotsearch.hppc.IntHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.procedures.IntProcedure; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.English; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; @@ -45,6 +47,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; @@ -77,9 +82,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -88,6 +96,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; @@ -103,6 +113,7 @@ public class RelocationIT extends ESIntegTestCase { @Override protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); + assertActiveCopiesEstablishedPeerRecoveryRetentionLeases(); internalCluster().assertSeqNos(); internalCluster().assertSameDocIdsOnShards(); } @@ -297,7 +308,7 @@ public class RelocationIT extends ESIntegTestCase { final Semaphore postRecoveryShards = new Semaphore(0); final IndexEventListener listener = new IndexEventListener() { @Override - public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, + public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { if (currentState == IndexShardState.POST_RECOVERY) { postRecoveryShards.release(); @@ -395,7 +406,7 @@ public class RelocationIT extends ESIntegTestCase { MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node); for (DiscoveryNode node : clusterService.state().nodes()) { if (!node.equals(clusterService.localNode())) { - mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()), + mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()), new RecoveryCorruption(corruptionCount)); } } @@ -603,6 +614,49 @@ public class RelocationIT extends ESIntegTestCase { assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); } + public void testRelocationEstablishedPeerRecoveryRetentionLeases() throws Exception { + int halfNodes = randomIntBetween(1, 3); + String indexName = "test"; + Settings[] nodeSettings = Stream.concat( + Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), + Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes)).toArray(Settings[]::new); + List nodes = internalCluster().startNodes(nodeSettings); + String[] blueNodes = nodes.subList(0, halfNodes).toArray(new String[0]); + String[] redNodes = nodes.subList(0, halfNodes).toArray(new String[0]); + ensureStableCluster(halfNodes * 2); + assertAcked( + client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, halfNodes - 1)) + .put("index.routing.allocation.include.color", "blue"))); + ensureGreen("test"); + assertBusy(() -> assertAllShardsOnNodes(indexName, blueNodes)); + assertActiveCopiesEstablishedPeerRecoveryRetentionLeases(); + client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.routing.allocation.include.color", "red")).get(); + assertBusy(() -> assertAllShardsOnNodes(indexName, redNodes)); + ensureGreen("test"); + assertActiveCopiesEstablishedPeerRecoveryRetentionLeases(); + } + + private void assertActiveCopiesEstablishedPeerRecoveryRetentionLeases() throws Exception { + assertBusy(() -> { + for (ObjectCursor it : client().admin().cluster().prepareState().get().getState().metaData().indices().keys()) { + Map> byShardId = Stream.of(client().admin().indices().prepareStats(it.value).get().getShards()) + .collect(Collectors.groupingBy(l -> l.getShardRouting().shardId())); + for (List shardStats : byShardId.values()) { + Set expectedLeaseIds = shardStats.stream() + .map(s -> ReplicationTracker.getPeerRecoveryRetentionLeaseId(s.getShardRouting())).collect(Collectors.toSet()); + for (ShardStats shardStat : shardStats) { + Set actualLeaseIds = shardStat.getRetentionLeaseStats().retentionLeases().leases().stream() + .map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(expectedLeaseIds, everyItem(in(actualLeaseIds))); + } + } + } + }); + } + class RecoveryCorruption implements StubbableTransport.SendRequestBehavior { private final CountDownLatch corruptionCount; @@ -619,7 +673,7 @@ public class RelocationIT extends ESIntegTestCase { if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) { // corrupting the segments_N files in order to make sure future recovery re-send files logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name()); - assert chunkRequest.content().toBytesRef().bytes == + assert chunkRequest.content().toBytesRef().bytes == chunkRequest.content().toBytesRef().bytes : "no internal reference!!"; byte[] array = chunkRequest.content().toBytesRef().bytes; array[0] = (byte) ~array[0]; // flip one byte in the content