Closed shard should never open new engine (#47186)
We should not open new engines if a shard is closed. We break this assumption in #45263 where we stop verifying the shard state before creating an engine but only before swapping the engine reference. We can fail to snapshot the store metadata or checkIndex a closed shard if there's some IndexWriter holding the index lock. Closes #47060
This commit is contained in:
parent
9a42e71dd9
commit
ff6c121eb9
|
@ -1192,12 +1192,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
synchronized (engineMutex) {
|
synchronized (engineMutex) {
|
||||||
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
|
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
|
||||||
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
|
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
|
||||||
synchronized (mutex) {
|
|
||||||
final Engine engine = getEngineOrNull();
|
final Engine engine = getEngineOrNull();
|
||||||
if (engine != null) {
|
if (engine != null) {
|
||||||
indexCommit = engine.acquireLastIndexCommit(false);
|
indexCommit = engine.acquireLastIndexCommit(false);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (indexCommit == null) {
|
if (indexCommit == null) {
|
||||||
return store.getMetadata(null, true);
|
return store.getMetadata(null, true);
|
||||||
}
|
}
|
||||||
|
@ -1320,9 +1318,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close(String reason, boolean flushEngine) throws IOException {
|
public void close(String reason, boolean flushEngine) throws IOException {
|
||||||
synchronized (mutex) {
|
synchronized (engineMutex) {
|
||||||
try {
|
try {
|
||||||
|
synchronized (mutex) {
|
||||||
changeState(IndexShardState.CLOSED, reason);
|
changeState(IndexShardState.CLOSED, reason);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
final Engine engine = this.currentEngineReference.getAndSet(null);
|
final Engine engine = this.currentEngineReference.getAndSet(null);
|
||||||
try {
|
try {
|
||||||
|
@ -1377,6 +1377,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
* This is the first operation after the local checkpoint of the safe commit if exists.
|
* This is the first operation after the local checkpoint of the safe commit if exists.
|
||||||
*/
|
*/
|
||||||
public long recoverLocallyUpToGlobalCheckpoint() {
|
public long recoverLocallyUpToGlobalCheckpoint() {
|
||||||
|
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
|
||||||
if (state != IndexShardState.RECOVERING) {
|
if (state != IndexShardState.RECOVERING) {
|
||||||
throw new IndexShardNotRecoveringException(shardId, state);
|
throw new IndexShardNotRecoveringException(shardId, state);
|
||||||
}
|
}
|
||||||
|
@ -1428,7 +1429,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
|
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
|
||||||
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
|
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (mutex) {
|
synchronized (engineMutex) {
|
||||||
IOUtils.close(currentEngineReference.getAndSet(null));
|
IOUtils.close(currentEngineReference.getAndSet(null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1603,23 +1604,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
|
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
|
||||||
+ "] but got " + getRetentionLeases();
|
+ "] but got " + getRetentionLeases();
|
||||||
synchronized (engineMutex) {
|
synchronized (engineMutex) {
|
||||||
|
assert currentEngineReference.get() == null : "engine is running";
|
||||||
|
verifyNotClosed();
|
||||||
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
|
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
|
||||||
final Engine newEngine = engineFactory.newReadWriteEngine(config);
|
final Engine newEngine = engineFactory.newReadWriteEngine(config);
|
||||||
synchronized (mutex) {
|
|
||||||
try {
|
|
||||||
verifyNotClosed();
|
|
||||||
assert currentEngineReference.get() == null : "engine is running";
|
|
||||||
onNewEngine(newEngine);
|
onNewEngine(newEngine);
|
||||||
currentEngineReference.set(newEngine);
|
currentEngineReference.set(newEngine);
|
||||||
// We set active because we are now writing operations to the engine; this way,
|
// We set active because we are now writing operations to the engine; this way,
|
||||||
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
|
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
|
||||||
active.set(true);
|
active.set(true);
|
||||||
} finally {
|
|
||||||
if (currentEngineReference.get() != newEngine) {
|
|
||||||
newEngine.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
|
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
|
||||||
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
|
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
|
||||||
|
@ -1650,7 +1643,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
* called if recovery has to be restarted after network error / delay **
|
* called if recovery has to be restarted after network error / delay **
|
||||||
*/
|
*/
|
||||||
public void performRecoveryRestart() throws IOException {
|
public void performRecoveryRestart() throws IOException {
|
||||||
synchronized (mutex) {
|
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
|
||||||
|
synchronized (engineMutex) {
|
||||||
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
|
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
|
||||||
IOUtils.close(currentEngineReference.getAndSet(null));
|
IOUtils.close(currentEngineReference.getAndSet(null));
|
||||||
resetRecoveryStage();
|
resetRecoveryStage();
|
||||||
|
@ -3325,7 +3319,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
|
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
|
||||||
*/
|
*/
|
||||||
void resetEngineToGlobalCheckpoint() throws IOException {
|
void resetEngineToGlobalCheckpoint() throws IOException {
|
||||||
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
|
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
|
||||||
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
|
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
|
||||||
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
|
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
|
||||||
sync(); // persist the global checkpoint to disk
|
sync(); // persist the global checkpoint to disk
|
||||||
|
@ -3338,6 +3332,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
|
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
|
||||||
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
|
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
|
||||||
synchronized (engineMutex) {
|
synchronized (engineMutex) {
|
||||||
|
verifyNotClosed();
|
||||||
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
|
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
|
||||||
// acquireXXXCommit and close works.
|
// acquireXXXCommit and close works.
|
||||||
final Engine readOnlyEngine =
|
final Engine readOnlyEngine =
|
||||||
|
@ -3365,7 +3360,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
assert Thread.holdsLock(mutex);
|
assert Thread.holdsLock(engineMutex);
|
||||||
|
|
||||||
Engine newEngine = newEngineReference.get();
|
Engine newEngine = newEngineReference.get();
|
||||||
if (newEngine == currentEngineReference.get()) {
|
if (newEngine == currentEngineReference.get()) {
|
||||||
|
@ -3375,28 +3370,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
IOUtils.close(super::close, newEngine);
|
IOUtils.close(super::close, newEngine);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
synchronized (mutex) {
|
|
||||||
try {
|
|
||||||
verifyNotClosed();
|
|
||||||
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
|
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
|
||||||
} finally {
|
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
|
||||||
if (currentEngineReference.get() != readOnlyEngine) {
|
onNewEngine(newEngineReference.get());
|
||||||
readOnlyEngine.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
|
|
||||||
synchronized (mutex) {
|
|
||||||
try {
|
|
||||||
verifyNotClosed();
|
|
||||||
newEngineReference.set(newReadWriteEngine);
|
|
||||||
onNewEngine(newReadWriteEngine);
|
|
||||||
} finally {
|
|
||||||
if (newEngineReference.get() != newReadWriteEngine) {
|
|
||||||
newReadWriteEngine.close(); // shard was closed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
|
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
|
||||||
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
|
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
|
||||||
|
@ -3404,7 +3380,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
});
|
});
|
||||||
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
|
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
|
||||||
newEngineReference.get().refresh("reset_engine");
|
newEngineReference.get().refresh("reset_engine");
|
||||||
synchronized (mutex) {
|
synchronized (engineMutex) {
|
||||||
verifyNotClosed();
|
verifyNotClosed();
|
||||||
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
|
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
|
||||||
// We set active because we are now writing operations to the engine; this way,
|
// We set active because we are now writing operations to the engine; this way,
|
||||||
|
|
|
@ -134,6 +134,7 @@ import java.util.Objects;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -537,15 +538,21 @@ public abstract class ESTestCase extends LuceneTestCase {
|
||||||
// TODO: can we do this cleaner???
|
// TODO: can we do this cleaner???
|
||||||
|
|
||||||
/** MockFSDirectoryService sets this: */
|
/** MockFSDirectoryService sets this: */
|
||||||
public static boolean checkIndexFailed;
|
public static final List<Exception> checkIndexFailures = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public final void resetCheckIndexStatus() throws Exception {
|
public final void resetCheckIndexStatus() throws Exception {
|
||||||
checkIndexFailed = false;
|
checkIndexFailures.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void ensureCheckIndexPassed() {
|
public final void ensureCheckIndexPassed() {
|
||||||
assertFalse("at least one shard failed CheckIndex", checkIndexFailed);
|
if (checkIndexFailures.isEmpty() == false) {
|
||||||
|
final AssertionError e = new AssertionError("at least one shard failed CheckIndex");
|
||||||
|
for (Exception failure : checkIndexFailures) {
|
||||||
|
e.addSuppressed(failure);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// -----------------------------------------------------------------
|
// -----------------------------------------------------------------
|
||||||
|
|
|
@ -84,17 +84,19 @@ public class MockFSDirectoryFactory implements IndexStorePlugin.DirectoryFactory
|
||||||
CheckIndex.Status status = store.checkIndex(out);
|
CheckIndex.Status status = store.checkIndex(out);
|
||||||
out.flush();
|
out.flush();
|
||||||
if (!status.clean) {
|
if (!status.clean) {
|
||||||
ESTestCase.checkIndexFailed = true;
|
IOException failure = new IOException("failed to check index for shard " + shardId +
|
||||||
logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
|
";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]");
|
||||||
throw new IOException("index check failure");
|
ESTestCase.checkIndexFailures.add(failure);
|
||||||
|
throw failure;
|
||||||
} else {
|
} else {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
|
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (LockObtainFailedException e) {
|
} catch (LockObtainFailedException e) {
|
||||||
ESTestCase.checkIndexFailed = true;
|
IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
|
||||||
throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
|
ESTestCase.checkIndexFailures.add(failure);
|
||||||
|
throw failure;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("failed to check index", e);
|
logger.warn("failed to check index", e);
|
||||||
|
|
Loading…
Reference in New Issue