From 2c1ef3d4c6043b6ddc02f4b35461ac766bf9bc3b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 16 Mar 2018 11:16:30 -0400 Subject: [PATCH] Do not renew sync-id if all shards are sealed (#29103) Today the synced-flush always issues a new sync-id even though all shards haven't been changed since the last seal. This causes active shards to have different a sync-id from offline shards even though all were sealed and no writes since then. This commit adjusts not to renew sync-id if all active shards are sealed with the same sync-id. Closes #27838 --- .../index/engine/CommitStats.java | 7 ++ .../indices/flush/SyncedFlushService.java | 67 +++++++++++++++---- .../elasticsearch/indices/flush/FlushIT.java | 49 ++++++++++++++ 3 files changed, 110 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java index b98c5a0db57..21025046b8c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java @@ -76,6 +76,13 @@ public final class CommitStats implements Streamable, ToXContentFragment { return new Engine.CommitId(Base64.getDecoder().decode(id)); } + /** + * The synced-flush id of the commit if existed. + */ + public String syncId() { + return userData.get(InternalEngine.SYNC_COMMIT_ID); + } + /** * Returns the number of documents in the in this commit */ diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 6a3618e6689..dedd577954d 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -34,6 +34,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -65,6 +67,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -216,9 +219,16 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL if (inflight != 0) { actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary")); } else { - // 3. now send the sync request to all the shards - String syncId = UUIDs.randomBase64UUID(); - sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener); + // 3. now send the sync request to all the shards; + final String sharedSyncId = sharedExistingSyncId(presyncResponses); + if (sharedSyncId != null) { + assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) : + "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]"; + reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener); + }else { + String syncId = UUIDs.randomBase64UUID(); + sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener); + } } } @@ -244,6 +254,33 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL } } + private String sharedExistingSyncId(Map preSyncedFlushResponses) { + String existingSyncId = null; + for (PreSyncedFlushResponse resp : preSyncedFlushResponses.values()) { + if (Strings.isNullOrEmpty(resp.existingSyncId)) { + return null; + } + if (existingSyncId == null) { + existingSyncId = resp.existingSyncId; + } + if (existingSyncId.equals(resp.existingSyncId) == false) { + return null; + } + } + return existingSyncId; + } + + private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List shards, int totalShards, + Map preSyncResponses, ActionListener listener) { + final Map results = new HashMap<>(); + for (final ShardRouting shard : shards) { + if (preSyncResponses.containsKey(shard.currentNodeId())) { + results.put(shard, new ShardSyncedFlushResponse()); + } + } + listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results)); + } + final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) { final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName()); if (indexRoutingTable == null) { @@ -438,7 +475,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL final CommitStats commitStats = indexShard.commitStats(); final Engine.CommitId commitId = commitStats.getRawCommitId(); logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs()); - return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs()); + return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId()); } private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { @@ -512,21 +549,15 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL Engine.CommitId commitId; int numDocs; + @Nullable String existingSyncId = null; PreSyncedFlushResponse() { } - PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) { + PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs, String existingSyncId) { this.commitId = commitId; this.numDocs = numDocs; - } - - Engine.CommitId commitId() { - return commitId; - } - - int numDocs() { - return numDocs; + this.existingSyncId = existingSyncId; } boolean includeNumDocs(Version version) { @@ -537,6 +568,10 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL } } + boolean includeExistingSyncId(Version version) { + return version.onOrAfter(Version.V_7_0_0_alpha1); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -546,6 +581,9 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL } else { numDocs = UNKNOWN_NUM_DOCS; } + if (includeExistingSyncId(in.getVersion())) { + existingSyncId = in.readOptionalString(); + } } @Override @@ -555,6 +593,9 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL if (includeNumDocs(out.getVersion())) { out.writeInt(numDocs); } + if (includeExistingSyncId(out.getVersion())) { + out.writeOptionalString(existingSyncId); + } } } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index c56602f789e..934222f9e72 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.flush; import org.apache.lucene.index.Term; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -29,6 +30,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -59,6 +61,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; public class FlushIT extends ESIntegTestCase { @@ -280,4 +283,50 @@ public class FlushIT extends ESIntegTestCase { assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1)); assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1)); } + + public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(between(2, 3)); + final int numberOfReplicas = internalCluster().numDataNodes() - 1; + assertAcked( + prepareCreate("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get() + ); + ensureGreen(); + final Index index = clusterService().state().metaData().index("test").getIndex(); + final ShardId shardId = new ShardId(index, 0); + final int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + index("test", "doc", Integer.toString(i)); + } + final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId); + assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1)); + // Do not renew synced-flush + final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId); + assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1)); + assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId())); + // Shards were updated, renew synced flush. + final int moreDocs = between(1, 10); + for (int i = 0; i < moreDocs; i++) { + index("test", "doc", Integer.toString(i)); + } + final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId); + assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1)); + assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId()))); + // Manually remove or change sync-id, renew synced flush. + IndexShard shard = internalCluster().getInstance(IndicesService.class, randomFrom(internalCluster().nodesInclude("test"))) + .getShardOrNull(shardId); + if (randomBoolean()) { + // Change the existing sync-id of a single shard. + shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId()); + assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId()))); + } else { + // Flush will create a new commit without sync-id + shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true)); + assertThat(shard.commitStats().syncId(), nullValue()); + } + final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId); + assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1)); + assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId()))); + } }