mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 05:15:04 +00:00
Flush instead of synced-flush inactive shards (#51365)
If all nodes are on 7.6, we prefer to perform a normal flush instead of synced flush when a shard becomes inactive. Backport of #49126
This commit is contained in:
parent
072203cba8
commit
1ca5dd13de
@ -42,6 +42,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||||
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
@ -51,6 +52,7 @@ import org.elasticsearch.index.engine.CommitStats;
|
|||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.IndexEventListener;
|
import org.elasticsearch.index.shard.IndexEventListener;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
import org.elasticsearch.index.shard.IndexShardState;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||||
import org.elasticsearch.indices.IndexClosedException;
|
import org.elasticsearch.indices.IndexClosedException;
|
||||||
@ -72,7 +74,6 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.stream.StreamSupport;
|
|
||||||
|
|
||||||
public class SyncedFlushService implements IndexEventListener {
|
public class SyncedFlushService implements IndexEventListener {
|
||||||
|
|
||||||
@ -111,8 +112,12 @@ public class SyncedFlushService implements IndexEventListener {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onShardInactive(final IndexShard indexShard) {
|
public void onShardInactive(final IndexShard indexShard) {
|
||||||
// we only want to call sync flush once, so only trigger it when we are on a primary
|
// A normal flush has the same effect as a synced flush if all nodes are on 7.6 or later.
|
||||||
if (indexShard.routingEntry().primary()) {
|
final boolean preferNormalFlush = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0);
|
||||||
|
if (preferNormalFlush) {
|
||||||
|
performNormalFlushOnInactive(indexShard);
|
||||||
|
} else if (indexShard.routingEntry().primary()) {
|
||||||
|
// we only want to call sync flush once, so only trigger it when we are on a primary
|
||||||
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
|
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
||||||
@ -128,6 +133,23 @@ public class SyncedFlushService implements IndexEventListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void performNormalFlushOnInactive(IndexShard shard) {
|
||||||
|
logger.debug("flushing shard {} on inactive", shard.routingEntry());
|
||||||
|
shard.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
if (shard.state() != IndexShardState.CLOSED) {
|
||||||
|
logger.warn(new ParameterizedMessage("failed to flush shard {} on inactive", shard.routingEntry()), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRun() {
|
||||||
|
shard.flush(new FlushRequest().force(false).waitIfOngoing(false));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* a utility method to perform a synced flush for all shards of multiple indices.
|
* a utility method to perform a synced flush for all shards of multiple indices.
|
||||||
* see {@link #attemptSyncedFlush(ShardId, ActionListener)}
|
* see {@link #attemptSyncedFlush(ShardId, ActionListener)}
|
||||||
@ -137,7 +159,7 @@ public class SyncedFlushService implements IndexEventListener {
|
|||||||
IndicesOptions indicesOptions,
|
IndicesOptions indicesOptions,
|
||||||
final ActionListener<SyncedFlushResponse> listener) {
|
final ActionListener<SyncedFlushResponse> listener) {
|
||||||
final ClusterState state = clusterService.state();
|
final ClusterState state = clusterService.state();
|
||||||
if (StreamSupport.stream(state.nodes().spliterator(), false).allMatch(n -> n.getVersion().onOrAfter(Version.V_7_6_0))) {
|
if (state.nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0)) {
|
||||||
DEPRECATION_LOGGER.deprecatedAndMaybeLog("synced_flush", SYNCED_FLUSH_DEPRECATION_MESSAGE);
|
DEPRECATION_LOGGER.deprecatedAndMaybeLog("synced_flush", SYNCED_FLUSH_DEPRECATION_MESSAGE);
|
||||||
}
|
}
|
||||||
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
|
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
|
||||||
|
@ -176,13 +176,11 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
|
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
|
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().setTranslog(true).get().getIndex("test");
|
||||||
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
|
assertThat(indexStats.getTotal().translog.getUncommittedOperations(), equalTo(0));
|
||||||
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
|
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
|
|
||||||
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDurableFlagHasEffect() throws Exception {
|
public void testDurableFlagHasEffect() throws Exception {
|
||||||
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.InternalEngine;
|
import org.elasticsearch.index.engine.InternalEngine;
|
||||||
@ -47,15 +48,22 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
|
|||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.indices.IndexingMemoryController;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||||
|
import org.elasticsearch.test.InternalTestCluster;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -71,6 +79,11 @@ import static org.hamcrest.Matchers.nullValue;
|
|||||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
||||||
public class FlushIT extends ESIntegTestCase {
|
public class FlushIT extends ESIntegTestCase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
|
return Collections.singletonList(InternalSettingsPlugin.class);
|
||||||
|
}
|
||||||
|
|
||||||
public void testWaitIfOngoing() throws InterruptedException {
|
public void testWaitIfOngoing() throws InterruptedException {
|
||||||
createIndex("test");
|
createIndex("test");
|
||||||
ensureGreen("test");
|
ensureGreen("test");
|
||||||
@ -369,4 +382,29 @@ public class FlushIT extends ESIntegTestCase {
|
|||||||
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
|
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
|
||||||
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
|
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFlushOnInactive() throws Exception {
|
||||||
|
final String indexName = "flush_on_inactive";
|
||||||
|
List<String> dataNodes = internalCluster().startDataOnlyNodes(2, Settings.builder()
|
||||||
|
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), randomTimeValue(10, 1000, "ms")).build());
|
||||||
|
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||||
|
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(50, 200, "ms"))
|
||||||
|
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
|
||||||
|
.build()));
|
||||||
|
ensureGreen(indexName);
|
||||||
|
int numDocs = randomIntBetween(1, 10);
|
||||||
|
for (int i = 0; i < numDocs; i++) {
|
||||||
|
client().prepareIndex(indexName, "_doc").setSource("f", "v").get();
|
||||||
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
|
internalCluster().restartNode(randomFrom(dataNodes), new InternalTestCluster.RestartCallback());
|
||||||
|
ensureGreen(indexName);
|
||||||
|
}
|
||||||
|
assertBusy(() -> {
|
||||||
|
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) {
|
||||||
|
assertThat(shardStats.getStats().getTranslog().getUncommittedOperations(), equalTo(0));
|
||||||
|
}
|
||||||
|
}, 30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2142,9 +2142,13 @@ public final class InternalTestCluster extends TestCluster {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public List<String> startDataOnlyNodes(int numNodes) {
|
public List<String> startDataOnlyNodes(int numNodes) {
|
||||||
|
return startDataOnlyNodes(numNodes, Settings.EMPTY);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
|
||||||
return startNodes(
|
return startNodes(
|
||||||
numNodes,
|
numNodes,
|
||||||
Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false)
|
Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false)
|
||||||
.put(Node.NODE_DATA_SETTING.getKey(), true).build());
|
.put(Node.NODE_DATA_SETTING.getKey(), true).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user