Ensure relocating shards establish peer recovery retention leases (#50486)

We forgot to establish peer recovery retention leases for relocating primaries
without soft-deletes.

Relates #50351
This commit is contained in:
Nhat Nguyen 2019-12-26 08:58:34 -05:00
parent 261566154b
commit e7c15a5c6e
2 changed files with 58 additions and 8 deletions

View File

@ -1348,11 +1348,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
// note that if there was no cluster state update between start of the engine of this shard and the call to
// initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort.
runAfter.run();
if (indexSettings.isSoftDeleteEnabled()) {
addPeerRecoveryRetentionLeaseForSolePrimary();
}
addPeerRecoveryRetentionLeaseForSolePrimary();
assert invariant();
}

View File

@ -20,12 +20,14 @@
package org.elasticsearch.recovery;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.procedures.IntProcedure;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.English;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
@ -45,6 +47,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
@ -77,9 +82,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -88,6 +96,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
@ -103,6 +113,7 @@ public class RelocationIT extends ESIntegTestCase {
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertActiveCopiesEstablishedPeerRecoveryRetentionLeases();
internalCluster().assertSeqNos();
internalCluster().assertSameDocIdsOnShards();
}
@ -297,7 +308,7 @@ public class RelocationIT extends ESIntegTestCase {
final Semaphore postRecoveryShards = new Semaphore(0);
final IndexEventListener listener = new IndexEventListener() {
@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState,
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState,
IndexShardState currentState, @Nullable String reason) {
if (currentState == IndexShardState.POST_RECOVERY) {
postRecoveryShards.release();
@ -395,7 +406,7 @@ public class RelocationIT extends ESIntegTestCase {
MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node);
for (DiscoveryNode node : clusterService.state().nodes()) {
if (!node.equals(clusterService.localNode())) {
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()),
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()),
new RecoveryCorruption(corruptionCount));
}
}
@ -603,6 +614,49 @@ public class RelocationIT extends ESIntegTestCase {
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L));
}
public void testRelocationEstablishedPeerRecoveryRetentionLeases() throws Exception {
int halfNodes = randomIntBetween(1, 3);
String indexName = "test";
Settings[] nodeSettings = Stream.concat(
Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes),
Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes)).toArray(Settings[]::new);
List<String> nodes = internalCluster().startNodes(nodeSettings);
String[] blueNodes = nodes.subList(0, halfNodes).toArray(new String[0]);
String[] redNodes = nodes.subList(0, halfNodes).toArray(new String[0]);
ensureStableCluster(halfNodes * 2);
assertAcked(
client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, halfNodes - 1))
.put("index.routing.allocation.include.color", "blue")));
ensureGreen("test");
assertBusy(() -> assertAllShardsOnNodes(indexName, blueNodes));
assertActiveCopiesEstablishedPeerRecoveryRetentionLeases();
client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.routing.allocation.include.color", "red")).get();
assertBusy(() -> assertAllShardsOnNodes(indexName, redNodes));
ensureGreen("test");
assertActiveCopiesEstablishedPeerRecoveryRetentionLeases();
}
private void assertActiveCopiesEstablishedPeerRecoveryRetentionLeases() throws Exception {
assertBusy(() -> {
for (ObjectCursor<String> it : client().admin().cluster().prepareState().get().getState().metaData().indices().keys()) {
Map<ShardId, List<ShardStats>> byShardId = Stream.of(client().admin().indices().prepareStats(it.value).get().getShards())
.collect(Collectors.groupingBy(l -> l.getShardRouting().shardId()));
for (List<ShardStats> shardStats : byShardId.values()) {
Set<String> expectedLeaseIds = shardStats.stream()
.map(s -> ReplicationTracker.getPeerRecoveryRetentionLeaseId(s.getShardRouting())).collect(Collectors.toSet());
for (ShardStats shardStat : shardStats) {
Set<String> actualLeaseIds = shardStat.getRetentionLeaseStats().retentionLeases().leases().stream()
.map(RetentionLease::id).collect(Collectors.toSet());
assertThat(expectedLeaseIds, everyItem(in(actualLeaseIds)));
}
}
}
});
}
class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {
private final CountDownLatch corruptionCount;
@ -619,7 +673,7 @@ public class RelocationIT extends ESIntegTestCase {
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
// corrupting the segments_N files in order to make sure future recovery re-send files
logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name());
assert chunkRequest.content().toBytesRef().bytes ==
assert chunkRequest.content().toBytesRef().bytes ==
chunkRequest.content().toBytesRef().bytes : "no internal reference!!";
byte[] array = chunkRequest.content().toBytesRef().bytes;
array[0] = (byte) ~array[0]; // flip one byte in the content