mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 12:56:53 +00:00
Recovery: fix recovered translog ops stat counting when retrying a batch
#11363 introduced a retry logic for the case where we have to wait on a mapping update during the translog replay phase of recovery. The retry throws or recovery stats off as it may count ops twice.
This commit is contained in:
parent
8f2dc10832
commit
10adb71445
@ -1327,7 +1327,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private final EngineConfig newEngineConfig(TranslogConfig translogConfig) {
|
private final EngineConfig newEngineConfig(TranslogConfig translogConfig) {
|
||||||
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
|
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
|
||||||
@Override
|
@Override
|
||||||
protected void operationProcessed() {
|
protected void operationProcessed() {
|
||||||
assert recoveryState != null;
|
assert recoveryState != null;
|
||||||
|
@ -55,8 +55,10 @@ public class TranslogRecoveryPerformer {
|
|||||||
private final IndexCache indexCache;
|
private final IndexCache indexCache;
|
||||||
private final MapperAnalyzer mapperAnalyzer;
|
private final MapperAnalyzer mapperAnalyzer;
|
||||||
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
|
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
|
||||||
|
private final ShardId shardId;
|
||||||
|
|
||||||
protected TranslogRecoveryPerformer(MapperService mapperService, MapperAnalyzer mapperAnalyzer, IndexQueryParserService queryParserService, IndexAliasesService indexAliasesService, IndexCache indexCache) {
|
protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, MapperAnalyzer mapperAnalyzer, IndexQueryParserService queryParserService, IndexAliasesService indexAliasesService, IndexCache indexCache) {
|
||||||
|
this.shardId = shardId;
|
||||||
this.mapperService = mapperService;
|
this.mapperService = mapperService;
|
||||||
this.queryParserService = queryParserService;
|
this.queryParserService = queryParserService;
|
||||||
this.indexAliasesService = indexAliasesService;
|
this.indexAliasesService = indexAliasesService;
|
||||||
@ -76,13 +78,33 @@ public class TranslogRecoveryPerformer {
|
|||||||
*/
|
*/
|
||||||
int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
|
int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
|
||||||
int numOps = 0;
|
int numOps = 0;
|
||||||
for (Translog.Operation operation : operations) {
|
try {
|
||||||
performRecoveryOperation(engine, operation, false);
|
for (Translog.Operation operation : operations) {
|
||||||
numOps++;
|
performRecoveryOperation(engine, operation, false);
|
||||||
|
numOps++;
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new BatchOperationException(shardId, "failed to apply batch translog operation [" + t.getMessage() + "]", numOps, t);
|
||||||
}
|
}
|
||||||
return numOps;
|
return numOps;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class BatchOperationException extends IndexShardException {
|
||||||
|
|
||||||
|
private final int completedOperations;
|
||||||
|
|
||||||
|
public BatchOperationException(ShardId shardId, String msg, int completedOperations, Throwable cause) {
|
||||||
|
super(shardId, msg, cause);
|
||||||
|
this.completedOperations = completedOperations;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** the number of succesful operations performed before the exception was thrown */
|
||||||
|
public int completedOperations() {
|
||||||
|
return completedOperations;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
|
private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
|
||||||
if (update == null) {
|
if (update == null) {
|
||||||
return;
|
return;
|
||||||
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.Nullable;
|
|||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
@ -506,6 +505,13 @@ public class RecoveryState implements ToXContent, Streamable {
|
|||||||
assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
|
assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized void decrementRecoveredOperations(int ops) {
|
||||||
|
recovered -= ops;
|
||||||
|
assert recovered >= 0 : "recovered operations must be non-negative. Because [" + recovered + "] after decrementing [" + ops + "]";
|
||||||
|
assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* returns the total number of translog operations recovered so far
|
* returns the total number of translog operations recovered so far
|
||||||
*/
|
*/
|
||||||
|
@ -47,10 +47,7 @@ import org.elasticsearch.index.IndexShardMissingException;
|
|||||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||||
import org.elasticsearch.index.mapper.MapperException;
|
import org.elasticsearch.index.mapper.MapperException;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
import org.elasticsearch.index.shard.*;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
|
||||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.indices.IndexMissingException;
|
import org.elasticsearch.indices.IndexMissingException;
|
||||||
import org.elasticsearch.indices.IndicesLifecycle;
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
@ -308,10 +305,14 @@ public class RecoveryTarget extends AbstractComponent {
|
|||||||
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
|
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
|
||||||
try {
|
try {
|
||||||
recoveryStatus.indexShard().performBatchRecovery(request.operations());
|
recoveryStatus.indexShard().performBatchRecovery(request.operations());
|
||||||
} catch (MapperException mapperException) {
|
} catch (TranslogRecoveryPerformer.BatchOperationException exception) {
|
||||||
|
if (ExceptionsHelper.unwrapCause(exception) instanceof MapperException == false) {
|
||||||
|
throw exception;
|
||||||
|
}
|
||||||
// in very rare cases a translog replay from primary is processed before a mapping update on this node
|
// in very rare cases a translog replay from primary is processed before a mapping update on this node
|
||||||
// which causes local mapping changes. we want to wait until these mappings are processed.
|
// which causes local mapping changes. we want to wait until these mappings are processed.
|
||||||
logger.trace("delaying recovery due to missing mapping changes", mapperException);
|
logger.trace("delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)", exception, exception.completedOperations());
|
||||||
|
translog.decrementRecoveredOperations(exception.completedOperations());
|
||||||
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
|
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
|
||||||
// canceled)
|
// canceled)
|
||||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||||
|
@ -1820,7 +1820,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
|||||||
public final AtomicInteger recoveredOps = new AtomicInteger(0);
|
public final AtomicInteger recoveredOps = new AtomicInteger(0);
|
||||||
|
|
||||||
public TranslogHandler(String indexName) {
|
public TranslogHandler(String indexName) {
|
||||||
super(null, new MapperAnalyzer(null), null, null, null);
|
super(new ShardId("test", 0), null, new MapperAnalyzer(null), null, null, null);
|
||||||
Settings settings = Settings.settingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
Settings settings = Settings.settingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||||
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder("test");
|
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder("test");
|
||||||
Index index = new Index(indexName);
|
Index index = new Index(indexName);
|
||||||
|
@ -389,6 +389,10 @@ public class RecoveryStateTest extends ElasticsearchTestCase {
|
|||||||
for (int j = iterationOps; j > 0; j--) {
|
for (int j = iterationOps; j > 0; j--) {
|
||||||
ops++;
|
ops++;
|
||||||
translog.incrementRecoveredOperations();
|
translog.incrementRecoveredOperations();
|
||||||
|
if (randomBoolean()) {
|
||||||
|
translog.decrementRecoveredOperations(1);
|
||||||
|
translog.incrementRecoveredOperations();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
assertThat(translog.recoveredOperations(), equalTo(ops));
|
assertThat(translog.recoveredOperations(), equalTo(ops));
|
||||||
assertThat(translog.totalOperations(), equalTo(totalOps));
|
assertThat(translog.totalOperations(), equalTo(totalOps));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user