`_flush` should block by default (#20597)

This commit changes the default behavior of `_flush` to block if other flushes are ongoing.
This also removes the use of `FlushNotAllowedException` and instead simply return immediately
by skipping the flush. Users should be aware if they set this option that the flush might or might
not flush everything to disk ie. no transactional behavior of some sort.

Closes #20569
This commit is contained in:
Simon Willnauer 2016-09-21 14:20:24 +02:00 committed by GitHub
parent 6dc03ecb10
commit 0151974500
24 changed files with 75 additions and 95 deletions

View File

@ -633,8 +633,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.repositories.RepositoryMissingException::new, 107),
DOCUMENT_SOURCE_MISSING_EXCEPTION(org.elasticsearch.index.engine.DocumentSourceMissingException.class,
org.elasticsearch.index.engine.DocumentSourceMissingException::new, 109),
FLUSH_NOT_ALLOWED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.FlushNotAllowedEngineException.class,
org.elasticsearch.index.engine.FlushNotAllowedEngineException::new, 110),
// 110 used to be FlushNotAllowedEngineException
NO_CLASS_SETTINGS_EXCEPTION(org.elasticsearch.common.settings.NoClassSettingsException.class,
org.elasticsearch.common.settings.NoClassSettingsException::new, 111),
BIND_TRANSPORT_EXCEPTION(org.elasticsearch.transport.BindTransportException.class,

View File

@ -40,7 +40,7 @@ import java.io.IOException;
public class FlushRequest extends BroadcastRequest<FlushRequest> {
private boolean force = false;
private boolean waitIfOngoing = false;
private boolean waitIfOngoing = true;
/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
@ -61,6 +61,7 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
/**
* if set to <tt>true</tt> the flush will block
* if a another flush operation is already running until the flush can be performed.
* The default is <code>true</code>
*/
public FlushRequest waitIfOngoing(boolean waitIfOngoing) {
this.waitIfOngoing = waitIfOngoing;

View File

@ -1105,8 +1105,6 @@ public abstract class Engine implements Closeable {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
try {
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
} catch (FlushNotAllowedEngineException ex) {
logger.debug("flush not allowed during flushAndClose - skipping");
} catch (EngineClosedException ex) {
logger.debug("engine already closed - skipping flushAndClose");
}
@ -1233,4 +1231,11 @@ public abstract class Engine implements Closeable {
* This operation will close the engine if the recovery fails.
*/
public abstract Engine recoverFromTranslog() throws IOException;
/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
public boolean isRecovering() {
return false;
}
}

View File

@ -1,45 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
*
*/
public class FlushNotAllowedEngineException extends EngineException {
public FlushNotAllowedEngineException(ShardId shardId, String msg) {
super(shardId, msg);
}
public FlushNotAllowedEngineException(StreamInput in) throws IOException{
super(in);
}
@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}

View File

@ -116,7 +116,7 @@ public class InternalEngine extends Engine {
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean allowCommits = new AtomicBoolean(true);
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@ -163,8 +163,9 @@ public class InternalEngine extends Engine {
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners());
}
@ -190,14 +191,14 @@ public class InternalEngine extends Engine {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
if (allowCommits.get()) {
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
} catch (Exception e) {
try {
allowCommits.set(false); // just play safe and never allow commits on this
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
failEngine("failed to recover from translog", e);
} catch (Exception inner) {
e.addSuppressed(inner);
@ -221,8 +222,8 @@ public class InternalEngine extends Engine {
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert allowCommits.get() == false : "commits are allowed but shouldn't";
allowCommits.set(true); // we are good - now we can commit
assert pendingTranslogRecovery.get(): "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
@ -765,7 +766,7 @@ public class InternalEngine extends Engine {
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
return new CommitId(lastCommittedSegmentInfos.getId());
}
} else {
logger.trace("acquired flush lock immediately");
@ -1287,8 +1288,8 @@ public class InternalEngine extends Engine {
// if we are in this stage we have to prevent flushes from this
// engine otherwise we might loose documents if the flush succeeds
// and the translog recover fails we we "commit" the translog on flush.
if (allowCommits.get() == false) {
throw new FlushNotAllowedEngineException(shardId, "flushes are disabled - pending translog recovery");
if (pendingTranslogRecovery.get()) {
throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery");
}
}
@ -1349,4 +1350,9 @@ public class InternalEngine extends Engine {
boolean indexWriterHasDeletions() {
return indexWriter.hasDeletions();
}
@Override
public boolean isRecovering() {
return pendingTranslogRecovery.get();
}
}

View File

@ -730,7 +730,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
verifyStartedOrRecovering();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
return getEngine().syncFlush(syncId, expectedCommitId);
Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "syncFlush is only allowed if the engine is not recovery" +
" from translog");
}
return engine.syncFlush(syncId, expectedCommitId);
}
public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
@ -741,11 +746,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
// we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since
// we use #writeIndexingBuffer for this now.
verifyStartedOrRecovering();
Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" +
" from translog");
}
long time = System.nanoTime();
Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
@ -1165,7 +1175,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
try {
indexEventListener.onShardInactive(this);
} catch (Exception e) {
logger.warn("failed to notify index event listener", e);
}
}
}
}

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
@ -386,7 +385,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
} catch (EngineClosedException | FlushNotAllowedEngineException e) {
} catch (EngineClosedException e) {
logger.trace("ignore exception while checking if shard {} is inactive", e, shard.shardId());
}
}

View File

@ -757,7 +757,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(107, org.elasticsearch.repositories.RepositoryMissingException.class);
ids.put(108, null);
ids.put(109, org.elasticsearch.index.engine.DocumentSourceMissingException.class);
ids.put(110, org.elasticsearch.index.engine.FlushNotAllowedEngineException.class);
ids.put(110, null); // FlushNotAllowedEngineException was removed in 5.0
ids.put(111, org.elasticsearch.common.settings.NoClassSettingsException.class);
ids.put(112, org.elasticsearch.transport.BindTransportException.class);
ids.put(113, org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException.class);

View File

@ -49,7 +49,7 @@ public class FlushBlocksIT extends ESIntegTestCase {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
try {
enableIndexBlock("test", blockSetting);
FlushResponse response = client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().actionGet();
FlushResponse response = client().admin().indices().prepareFlush("test").execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {

View File

@ -54,7 +54,7 @@ public class IndicesSegmentsRequestTests extends ESSingleNodeTestCase {
String id = Integer.toString(j);
client().prepareIndex("test", "type1", id).setSource("text", "sometext").get();
}
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).get();
client().admin().indices().prepareFlush("test").get();
}
public void testBasic() {

View File

@ -213,7 +213,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
builders[i] = client().prepareIndex(index, "type").setSource("field", "value");
}
indexRandom(true, builders);
client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet();
client().admin().indices().prepareFlush().setForce(true).execute().actionGet();
}
private static final class IndexNodePredicate implements Predicate<Settings> {

View File

@ -417,7 +417,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
client().admin().indices().prepareFlush().setForce(true).get();
boolean useSyncIds = randomBoolean();
if (useSyncIds == false) {

View File

@ -80,7 +80,7 @@ public class ReusePeerRecoverySharedTest {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
// just wait for merges
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get();
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
client().admin().indices().prepareFlush().setForce(true).get();
if (useSyncIds == false) {
logger.info("--> disabling allocation while the cluster is shut down");

View File

@ -142,7 +142,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("foo", "doc", ""+i).setSource("foo", "bar").get();
}
assertNoFailures(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertNoFailures(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()

View File

@ -586,6 +586,7 @@ public class InternalEngineTests extends ESTestCase {
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
@ -594,13 +595,16 @@ public class InternalEngineTests extends ESTestCase {
}
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
assertFalse(engine.isRecovering());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
expectThrows(FlushNotAllowedEngineException.class, () -> engine.flush(true, true));
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
@ -2114,6 +2118,7 @@ public class InternalEngineTests extends ESTestCase {
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
assertFalse(engine.isRecovering());
engine.index(firstIndexRequest);
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
@ -2126,6 +2131,7 @@ public class InternalEngineTests extends ESTestCase {
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));

View File

@ -159,7 +159,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -262,7 +262,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -408,7 +408,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -491,7 +491,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@ -546,7 +546,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);

View File

@ -62,7 +62,7 @@ public class FlushIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(10);
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();
for (int j = 0; j < 10; j++) {
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute(new ActionListener<FlushResponse>() {
client().admin().indices().prepareFlush("test").execute(new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
try {

View File

@ -348,7 +348,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
}
indexRandom(true, builder);
if (randomBoolean()) {
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).execute().get();
client().admin().indices().prepareFlush("test").setForce(true).execute().get();
}
client().admin().indices().prepareClose("test").execute().get();

View File

@ -111,7 +111,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
}
ensureGreen();
// ensure we have flushed segments and make them a big one via optimize
client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).get();
client().admin().indices().prepareFlush().setForce(true).get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get();
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -67,7 +67,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
String id = Integer.toString(i);
client().prepareIndex(indexName, "type1", id).setSource("text", "sometext").get();
}
client().admin().indices().prepareFlush(indexName).setWaitIfOngoing(true).get();
client().admin().indices().prepareFlush(indexName).get();
logger.info("--> create first snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()

View File

@ -99,7 +99,7 @@ public class SearchWithRandomIOExceptionsIT extends ESIntegTestCase {
client().prepareIndex("test", "type", "init" + i).setSource("test", "init").get();
}
client().admin().indices().prepareRefresh("test").execute().get();
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().get();
client().admin().indices().prepareFlush("test").execute().get();
client().admin().indices().prepareClose("test").execute().get();
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), exceptionRate)

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.ESIntegTestCase;
@ -617,11 +616,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
if (threadRandom.nextInt(100) == 7) {
logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime);
try {
flush();
} catch (FlushNotAllowedEngineException fnaee) {
// OK
}
logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime);
}
}

View File

@ -18,7 +18,7 @@
},
"wait_if_ongoing": {
"type" : "boolean",
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running."
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is true. If set to false the flush will be skipped iff if another flush operation is already running."
},
"ignore_unavailable": {
"type" : "boolean",

View File

@ -1204,7 +1204,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
*/
protected final FlushResponse flush(String... indices) {
waitForRelocation();
FlushResponse actionGet = client().admin().indices().prepareFlush(indices).setWaitIfOngoing(true).execute().actionGet();
FlushResponse actionGet = client().admin().indices().prepareFlush(indices).execute().actionGet();
for (ShardOperationFailedException failure : actionGet.getShardFailures()) {
assertThat("unexpected flush failure " + failure.reason(), failure.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}