Set acking timeout to 0 on dynamic mapping update (#31140)

As acking can fail for any reason (unrelated node being too slow, node disconnecting), it should not
be required for acking to succeed in order for index requests with dynamic mapping updates to
successfully complete.

Relates to #30672 and Closes #30844
This commit is contained in:
Yannick Welsch 2019-01-24 11:39:46 +01:00 committed by GitHub
parent b6936e3c1e
commit 64adb5ad5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 6 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.action.index;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
@ -67,7 +66,7 @@ public class MappingUpdatedAction {
throw new IllegalArgumentException("_default_ mapping should not be updated");
}
return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(timeout).setTimeout(timeout);
.setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO);
}
/**
@ -84,8 +83,6 @@ public class MappingUpdatedAction {
* been applied to the master node and propagated to data nodes.
*/
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) {
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
}
updateMappingRequest(index, type, mappingUpdate, timeout).get();
}
}

View File

@ -66,13 +66,13 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase {
* This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario.
*/
@TestLogging("_root:DEBUG")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30844")
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
logger.info("--> start 4 nodes, 3 master, 1 data");
final Settings sharedSettings = Settings.builder()
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)

View File

@ -20,8 +20,10 @@ package org.elasticsearch.cluster.routing;
*/
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
@ -30,6 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
@ -91,6 +94,34 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}
public void testBulkWeirdScenario() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
ensureGreen();
BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareIndex().setIndex("test").setType("_doc").setId("1").setSource("field1", "value1"))
.add(client().prepareUpdate().setIndex("test").setType("_doc").setId("1").setDoc("field2", "value2"))
.execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(2));
logger.info(Strings.toString(bulkResponse, true, true));
internalCluster().assertSeqNos();
assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L));
assertThat(bulkResponse.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.UPDATED));
}
private void createStaleReplicaScenario(String master) throws Exception {
client().prepareIndex("test", "type1").setSource(jsonBuilder()
.startObject().field("field", "value1").endObject()).get();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.state;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
@ -397,6 +399,24 @@ public class RareClusterStateIT extends ESIntegTestCase {
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists()));
// index another document, this time using dynamic mappings.
// The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even
// if the dynamic mapping update is not applied on the replica yet.
ActionFuture<IndexResponse> dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute();
// ...and wait for second mapping to be available on master
assertBusy(() -> {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
final IndexService indexService = indicesService.indexServiceSafe(index);
assertNotNull(indexService);
final MapperService mapperService = indexService.mapperService();
DocumentMapper mapper = mapperService.documentMapper("type");
assertNotNull(mapper);
assertNotNull(mapper.mappers().getMapper("field2"));
});
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "2").get().isExists()));
// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
// and not just because it takes time to replicate the indexing request to the replica
@ -415,6 +435,8 @@ public class RareClusterStateIT extends ESIntegTestCase {
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
});
assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED));
}
}