Do not renew sync-id if all shards are sealed (#29103)
Today the synced-flush always issues a new sync-id even though all shards haven't been changed since the last seal. This causes active shards to have different a sync-id from offline shards even though all were sealed and no writes since then. This commit adjusts not to renew sync-id if all active shards are sealed with the same sync-id. Closes #27838
This commit is contained in:
parent
8342ba9108
commit
2c1ef3d4c6
|
@ -76,6 +76,13 @@ public final class CommitStats implements Streamable, ToXContentFragment {
|
||||||
return new Engine.CommitId(Base64.getDecoder().decode(id));
|
return new Engine.CommitId(Base64.getDecoder().decode(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The synced-flush id of the commit if existed.
|
||||||
|
*/
|
||||||
|
public String syncId() {
|
||||||
|
return userData.get(InternalEngine.SYNC_COMMIT_ID);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of documents in the in this commit
|
* Returns the number of documents in the in this commit
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -34,6 +34,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -65,6 +67,7 @@ import org.elasticsearch.transport.TransportService;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
@ -216,11 +219,18 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
if (inflight != 0) {
|
if (inflight != 0) {
|
||||||
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
|
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
|
||||||
} else {
|
} else {
|
||||||
// 3. now send the sync request to all the shards
|
// 3. now send the sync request to all the shards;
|
||||||
|
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
|
||||||
|
if (sharedSyncId != null) {
|
||||||
|
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
|
||||||
|
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
|
||||||
|
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
|
||||||
|
}else {
|
||||||
String syncId = UUIDs.randomBase64UUID();
|
String syncId = UUIDs.randomBase64UUID();
|
||||||
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
|
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
|
@ -244,6 +254,33 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String sharedExistingSyncId(Map<String, PreSyncedFlushResponse> preSyncedFlushResponses) {
|
||||||
|
String existingSyncId = null;
|
||||||
|
for (PreSyncedFlushResponse resp : preSyncedFlushResponses.values()) {
|
||||||
|
if (Strings.isNullOrEmpty(resp.existingSyncId)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (existingSyncId == null) {
|
||||||
|
existingSyncId = resp.existingSyncId;
|
||||||
|
}
|
||||||
|
if (existingSyncId.equals(resp.existingSyncId) == false) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return existingSyncId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List<ShardRouting> shards, int totalShards,
|
||||||
|
Map<String, PreSyncedFlushResponse> preSyncResponses, ActionListener<ShardsSyncedFlushResult> listener) {
|
||||||
|
final Map<ShardRouting, ShardSyncedFlushResponse> results = new HashMap<>();
|
||||||
|
for (final ShardRouting shard : shards) {
|
||||||
|
if (preSyncResponses.containsKey(shard.currentNodeId())) {
|
||||||
|
results.put(shard, new ShardSyncedFlushResponse());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
|
||||||
|
}
|
||||||
|
|
||||||
final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
|
final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
|
||||||
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
|
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
|
||||||
if (indexRoutingTable == null) {
|
if (indexRoutingTable == null) {
|
||||||
|
@ -438,7 +475,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
final CommitStats commitStats = indexShard.commitStats();
|
final CommitStats commitStats = indexShard.commitStats();
|
||||||
final Engine.CommitId commitId = commitStats.getRawCommitId();
|
final Engine.CommitId commitId = commitStats.getRawCommitId();
|
||||||
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
|
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
|
||||||
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs());
|
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
|
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
|
||||||
|
@ -512,21 +549,15 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
|
|
||||||
Engine.CommitId commitId;
|
Engine.CommitId commitId;
|
||||||
int numDocs;
|
int numDocs;
|
||||||
|
@Nullable String existingSyncId = null;
|
||||||
|
|
||||||
PreSyncedFlushResponse() {
|
PreSyncedFlushResponse() {
|
||||||
}
|
}
|
||||||
|
|
||||||
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) {
|
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs, String existingSyncId) {
|
||||||
this.commitId = commitId;
|
this.commitId = commitId;
|
||||||
this.numDocs = numDocs;
|
this.numDocs = numDocs;
|
||||||
}
|
this.existingSyncId = existingSyncId;
|
||||||
|
|
||||||
Engine.CommitId commitId() {
|
|
||||||
return commitId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int numDocs() {
|
|
||||||
return numDocs;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean includeNumDocs(Version version) {
|
boolean includeNumDocs(Version version) {
|
||||||
|
@ -537,6 +568,10 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean includeExistingSyncId(Version version) {
|
||||||
|
return version.onOrAfter(Version.V_7_0_0_alpha1);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
|
@ -546,6 +581,9 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
} else {
|
} else {
|
||||||
numDocs = UNKNOWN_NUM_DOCS;
|
numDocs = UNKNOWN_NUM_DOCS;
|
||||||
}
|
}
|
||||||
|
if (includeExistingSyncId(in.getVersion())) {
|
||||||
|
existingSyncId = in.readOptionalString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -555,6 +593,9 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
if (includeNumDocs(out.getVersion())) {
|
if (includeNumDocs(out.getVersion())) {
|
||||||
out.writeInt(numDocs);
|
out.writeInt(numDocs);
|
||||||
}
|
}
|
||||||
|
if (includeExistingSyncId(out.getVersion())) {
|
||||||
|
out.writeOptionalString(existingSyncId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.indices.flush;
|
||||||
|
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
||||||
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
|
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||||
|
@ -29,6 +30,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||||
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -59,6 +61,7 @@ import java.util.stream.Collectors;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.hamcrest.Matchers.emptyIterable;
|
import static org.hamcrest.Matchers.emptyIterable;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
public class FlushIT extends ESIntegTestCase {
|
public class FlushIT extends ESIntegTestCase {
|
||||||
|
@ -280,4 +283,50 @@ public class FlushIT extends ESIntegTestCase {
|
||||||
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
|
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
|
||||||
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
|
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
|
||||||
|
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
|
||||||
|
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
|
||||||
|
assertAcked(
|
||||||
|
prepareCreate("test").setSettings(Settings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get()
|
||||||
|
);
|
||||||
|
ensureGreen();
|
||||||
|
final Index index = clusterService().state().metaData().index("test").getIndex();
|
||||||
|
final ShardId shardId = new ShardId(index, 0);
|
||||||
|
final int numDocs = between(1, 10);
|
||||||
|
for (int i = 0; i < numDocs; i++) {
|
||||||
|
index("test", "doc", Integer.toString(i));
|
||||||
|
}
|
||||||
|
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
|
||||||
|
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
|
||||||
|
// Do not renew synced-flush
|
||||||
|
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
|
||||||
|
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
|
||||||
|
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
|
||||||
|
// Shards were updated, renew synced flush.
|
||||||
|
final int moreDocs = between(1, 10);
|
||||||
|
for (int i = 0; i < moreDocs; i++) {
|
||||||
|
index("test", "doc", Integer.toString(i));
|
||||||
|
}
|
||||||
|
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
|
||||||
|
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
|
||||||
|
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
|
||||||
|
// Manually remove or change sync-id, renew synced flush.
|
||||||
|
IndexShard shard = internalCluster().getInstance(IndicesService.class, randomFrom(internalCluster().nodesInclude("test")))
|
||||||
|
.getShardOrNull(shardId);
|
||||||
|
if (randomBoolean()) {
|
||||||
|
// Change the existing sync-id of a single shard.
|
||||||
|
shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId());
|
||||||
|
assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId())));
|
||||||
|
} else {
|
||||||
|
// Flush will create a new commit without sync-id
|
||||||
|
shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true));
|
||||||
|
assertThat(shard.commitStats().syncId(), nullValue());
|
||||||
|
}
|
||||||
|
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
|
||||||
|
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
|
||||||
|
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue