Recovery: failed to properly ack translog ops during wait on mapping changes
During the second phase of recovery, replayed transaction log entries may need to wait on mapping changes that have not yet propagated to the target node. Currently we correctly replay the operation at a later stage, but we acknowledge the replay request before actually performing the work. Example failure: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/859/ Closes #13535
This commit is contained in:
parent
5098dcaf96
commit
a758eec78a
|
@ -1350,7 +1350,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
private final EngineConfig newEngineConfig(TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy) {
|
||||
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, queryParserService, indexAliasesService, indexCache) {
|
||||
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, queryParserService,
|
||||
indexAliasesService, indexCache, logger) {
|
||||
@Override
|
||||
protected void operationProcessed() {
|
||||
assert recoveryState != null;
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -34,12 +35,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService;
|
|||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
|
||||
import org.elasticsearch.index.mapper.DocumentMapperForType;
|
||||
import org.elasticsearch.index.mapper.MapperException;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.MapperUtils;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
import org.elasticsearch.index.query.ParsedQuery;
|
||||
import org.elasticsearch.index.query.QueryParsingException;
|
||||
|
@ -60,15 +56,18 @@ public class TranslogRecoveryPerformer {
|
|||
private final IndexQueryParserService queryParserService;
|
||||
private final IndexAliasesService indexAliasesService;
|
||||
private final IndexCache indexCache;
|
||||
private final ESLogger logger;
|
||||
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
|
||||
private final ShardId shardId;
|
||||
|
||||
protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, IndexQueryParserService queryParserService, IndexAliasesService indexAliasesService, IndexCache indexCache) {
|
||||
protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, IndexQueryParserService queryParserService,
|
||||
IndexAliasesService indexAliasesService, IndexCache indexCache, ESLogger logger) {
|
||||
this.shardId = shardId;
|
||||
this.mapperService = mapperService;
|
||||
this.queryParserService = queryParserService;
|
||||
this.indexAliasesService = indexAliasesService;
|
||||
this.indexCache = indexCache;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
protected DocumentMapperForType docMapper(String type) {
|
||||
|
@ -153,6 +152,9 @@ public class TranslogRecoveryPerformer {
|
|||
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
|
||||
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
|
||||
maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id());
|
||||
}
|
||||
engine.create(engineCreate);
|
||||
break;
|
||||
case SAVE:
|
||||
|
@ -161,11 +163,17 @@ public class TranslogRecoveryPerformer {
|
|||
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
|
||||
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
|
||||
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());
|
||||
}
|
||||
engine.index(engineIndex);
|
||||
break;
|
||||
case DELETE:
|
||||
Translog.Delete delete = (Translog.Delete) operation;
|
||||
Uid uid = Uid.createUid(delete.uid().text());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id());
|
||||
}
|
||||
engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
|
||||
delete.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, System.nanoTime(), false));
|
||||
break;
|
||||
|
|
|
@ -304,6 +304,7 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
|
||||
try {
|
||||
recoveryStatus.indexShard().performBatchRecovery(request.operations());
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
} catch (TranslogRecoveryPerformer.BatchOperationException exception) {
|
||||
MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class);
|
||||
if (mapperException == null) {
|
||||
|
@ -346,8 +347,6 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
}
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,13 +42,13 @@ import org.apache.lucene.util.TestUtil;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Base64;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -256,7 +256,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
|
||||
// we don't need to notify anybody in this test
|
||||
}
|
||||
}, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig);
|
||||
}, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig);
|
||||
try {
|
||||
config.setCreate(Lucene.indexExists(store.directory()) == false);
|
||||
} catch (IOException e) {
|
||||
|
@ -1979,8 +1979,8 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public final AtomicInteger recoveredOps = new AtomicInteger(0);
|
||||
|
||||
public TranslogHandler(String indexName) {
|
||||
super(new ShardId("test", 0), null, null, null, null);
|
||||
public TranslogHandler(String indexName, ESLogger logger) {
|
||||
super(new ShardId("test", 0), null, null, null, null, logger);
|
||||
Settings settings = Settings.settingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder("test");
|
||||
Index index = new Index(indexName);
|
||||
|
|
|
@ -51,8 +51,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.core.IsNull.notNullValue;
|
||||
|
||||
|
@ -100,9 +99,9 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", "value", "type=integer"));
|
||||
List<IndexRequestBuilder> builders = new ArrayList<>();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
builders.add(client().prepareIndex("empty_bucket_idx", "type", ""+i).setSource(jsonBuilder()
|
||||
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(jsonBuilder()
|
||||
.startObject()
|
||||
.field("value", i*2)
|
||||
.field("value", i * 2)
|
||||
.endObject()));
|
||||
}
|
||||
builders.addAll(Arrays.asList(
|
||||
|
@ -167,9 +166,9 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
|
||||
@Test
|
||||
public void singleValuedField_WithTimeZone() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo").field("date").interval(DateHistogramInterval.DAY).minDocCount(1).timeZone("+01:00")).execute()
|
||||
.actionGet();
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo").field("date").interval(DateHistogramInterval.DAY).minDocCount(1).timeZone("+01:00")).execute()
|
||||
.actionGet();
|
||||
DateTimeZone tz = DateTimeZone.forID("+01:00");
|
||||
assertSearchResponse(response);
|
||||
|
||||
|
@ -252,7 +251,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.KEY_DESC))
|
||||
.order(Histogram.Order.KEY_DESC))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
@ -275,7 +274,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.COUNT_ASC))
|
||||
.order(Histogram.Order.COUNT_ASC))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
@ -298,7 +297,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.COUNT_DESC))
|
||||
.order(Histogram.Order.COUNT_DESC))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
@ -319,7 +318,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
public void singleValuedField_WithSubAggregation() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo").field("date").interval(DateHistogramInterval.MONTH)
|
||||
.subAggregation(sum("sum").field("value")))
|
||||
.subAggregation(sum("sum").field("value")))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
@ -425,7 +424,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.aggregation("sum", true))
|
||||
.order(Histogram.Order.aggregation("sum", true))
|
||||
.subAggregation(max("sum").field("value")))
|
||||
.execute().actionGet();
|
||||
|
||||
|
@ -449,7 +448,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.aggregation("sum", false))
|
||||
.order(Histogram.Order.aggregation("sum", false))
|
||||
.subAggregation(max("sum").field("value")))
|
||||
.execute().actionGet();
|
||||
|
||||
|
@ -473,7 +472,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.aggregation("stats", "sum", true))
|
||||
.order(Histogram.Order.aggregation("stats", "sum", true))
|
||||
.subAggregation(stats("stats").field("value")))
|
||||
.execute().actionGet();
|
||||
|
||||
|
@ -497,7 +496,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.aggregation("stats", "sum", false))
|
||||
.order(Histogram.Order.aggregation("stats", "sum", false))
|
||||
.subAggregation(stats("stats").field("value")))
|
||||
.execute().actionGet();
|
||||
|
||||
|
@ -520,8 +519,8 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo")
|
||||
.field("date")
|
||||
.script(new Script("new DateTime(_value).plusMonths(1).getMillis()"))
|
||||
.interval(DateHistogramInterval.MONTH)).execute().actionGet();
|
||||
.script(new Script("new DateTime(_value).plusMonths(1).getMillis()"))
|
||||
.interval(DateHistogramInterval.MONTH)).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
|
@ -614,7 +613,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(dateHistogram("histo")
|
||||
.field("dates")
|
||||
.interval(DateHistogramInterval.MONTH)
|
||||
.order(Histogram.Order.COUNT_DESC))
|
||||
.order(Histogram.Order.COUNT_DESC))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
@ -646,7 +645,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
|
||||
/**
|
||||
* The script will change to document date values to the following:
|
||||
*
|
||||
* <p/>
|
||||
* doc 1: [ Feb 2, Mar 3]
|
||||
* doc 2: [ Mar 2, Apr 3]
|
||||
* doc 3: [ Mar 15, Apr 16]
|
||||
|
@ -659,8 +658,8 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo")
|
||||
.field("dates")
|
||||
.script(new Script("new DateTime(_value, DateTimeZone.UTC).plusMonths(1).getMillis()"))
|
||||
.interval(DateHistogramInterval.MONTH)).execute().actionGet();
|
||||
.script(new Script("new DateTime(_value, DateTimeZone.UTC).plusMonths(1).getMillis()"))
|
||||
.interval(DateHistogramInterval.MONTH)).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
|
@ -701,22 +700,21 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
|
||||
/**
|
||||
* The script will change to document date values to the following:
|
||||
*
|
||||
* <p/>
|
||||
* doc 1: [ Feb 2, Mar 3]
|
||||
* doc 2: [ Mar 2, Apr 3]
|
||||
* doc 3: [ Mar 15, Apr 16]
|
||||
* doc 4: [ Apr 2, May 3]
|
||||
* doc 5: [ Apr 15, May 16]
|
||||
* doc 6: [ Apr 23, May 24]
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void multiValuedField_WithValueScript_WithInheritedSubAggregator() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo")
|
||||
.field("dates")
|
||||
.script(new Script("new DateTime((long)_value, DateTimeZone.UTC).plusMonths(1).getMillis()"))
|
||||
.interval(DateHistogramInterval.MONTH).subAggregation(max("max"))).execute().actionGet();
|
||||
.script(new Script("new DateTime((long)_value, DateTimeZone.UTC).plusMonths(1).getMillis()"))
|
||||
.interval(DateHistogramInterval.MONTH).subAggregation(max("max"))).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
|
@ -815,8 +813,8 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
public void script_SingleValue_WithSubAggregator_Inherited() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo")
|
||||
.script(new Script("doc['date'].value")).interval(DateHistogramInterval.MONTH)
|
||||
.subAggregation(max("max"))).execute().actionGet();
|
||||
.script(new Script("doc['date'].value")).interval(DateHistogramInterval.MONTH)
|
||||
.subAggregation(max("max"))).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
|
@ -915,8 +913,8 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
public void script_MultiValued_WithAggregatorInherited() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(dateHistogram("histo")
|
||||
.script(new Script("doc['dates'].values")).interval(DateHistogramInterval.MONTH)
|
||||
.subAggregation(max("max"))).execute().actionGet();
|
||||
.script(new Script("doc['dates'].values")).interval(DateHistogramInterval.MONTH)
|
||||
.subAggregation(max("max"))).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
|
@ -1154,7 +1152,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.field("date")
|
||||
.interval(DateHistogramInterval.days(interval))
|
||||
.minDocCount(0)
|
||||
// when explicitly specifying a format, the extended bounds should be defined by the same format
|
||||
// when explicitly specifying a format, the extended bounds should be defined by the same format
|
||||
.extendedBounds(format(boundsMin, pattern), format(boundsMax, pattern))
|
||||
.format(pattern))
|
||||
.execute().actionGet();
|
||||
|
@ -1232,7 +1230,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.addAggregation(
|
||||
dateHistogram("histo").field("date").interval(DateHistogramInterval.hours(1)).timeZone(timezone.getID()).minDocCount(0)
|
||||
.extendedBounds("now/d", "now/d+23h")
|
||||
).execute().actionGet();
|
||||
).execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
|
||||
assertThat("Expected 24 buckets for one day aggregation with hourly interval", response.getHits().totalHits(), equalTo(2l));
|
||||
|
@ -1246,7 +1244,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
for (int i = 0; i < buckets.size(); i++) {
|
||||
Histogram.Bucket bucket = buckets.get(i);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat("Bucket " + i +" had wrong key", (DateTime) bucket.getKey(), equalTo(new DateTime(timeZoneStartToday.getMillis() + (i * 60 * 60 * 1000), DateTimeZone.UTC)));
|
||||
assertThat("Bucket " + i + " had wrong key", (DateTime) bucket.getKey(), equalTo(new DateTime(timeZoneStartToday.getMillis() + (i * 60 * 60 * 1000), DateTimeZone.UTC)));
|
||||
if (i == 0 || i == 12) {
|
||||
assertThat(bucket.getDocCount(), equalTo(1l));
|
||||
} else {
|
||||
|
@ -1274,7 +1272,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
.interval(DateHistogramInterval.DAY))
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getHits().getTotalHits(), equalTo(5l));
|
||||
assertSearchHits(response, "0", "1", "2", "3", "4");
|
||||
|
||||
Histogram histo = response.getAggregations().get("date_histo");
|
||||
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
|
||||
|
@ -1349,7 +1347,7 @@ public class DateHistogramIT extends ESIntegTestCase {
|
|||
ensureSearchable("test8209");
|
||||
SearchResponse response = client().prepareSearch("test8209")
|
||||
.addAggregation(dateHistogram("histo").field("d").interval(DateHistogramInterval.MONTH).timeZone("CET")
|
||||
.minDocCount(0))
|
||||
.minDocCount(0))
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
Histogram histo = response.getAggregations().get("histo");
|
||||
|
|
Loading…
Reference in New Issue