Block too many concurrent mapping updates (#51038)
Ensures that there are not too many concurrent dynamic mapping updates going out from the data nodes to the master. Closes #50670
This commit is contained in:
parent
b4a631277a
commit
dc47b380c8
|
@ -32,11 +32,14 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
/**
|
||||
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
|
||||
* in the cluster state meta data (and broadcast to all members).
|
||||
|
@ -47,19 +50,30 @@ public class MappingUpdatedAction {
|
|||
Setting.positiveTimeSetting("indices.mapping.dynamic_timeout", TimeValue.timeValueSeconds(30),
|
||||
Property.Dynamic, Property.NodeScope);
|
||||
|
||||
public static final Setting<Integer> INDICES_MAX_IN_FLIGHT_UPDATES_SETTING =
|
||||
Setting.intSetting("indices.mapping.max_in_flight_updates", 10, 1, 1000,
|
||||
Property.Dynamic, Property.NodeScope);
|
||||
|
||||
private IndicesAdminClient client;
|
||||
private volatile TimeValue dynamicMappingUpdateTimeout;
|
||||
private final AdjustableSemaphore semaphore;
|
||||
|
||||
@Inject
|
||||
public MappingUpdatedAction(Settings settings, ClusterSettings clusterSettings) {
|
||||
this.dynamicMappingUpdateTimeout = INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.get(settings);
|
||||
this.semaphore = new AdjustableSemaphore(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.get(settings), true);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, this::setDynamicMappingUpdateTimeout);
|
||||
clusterSettings.addSettingsUpdateConsumer(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, this::setMaxInFlightUpdates);
|
||||
}
|
||||
|
||||
private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeout) {
|
||||
this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout;
|
||||
}
|
||||
|
||||
private void setMaxInFlightUpdates(int maxInFlightUpdates) {
|
||||
semaphore.setMaxPermits(maxInFlightUpdates);
|
||||
}
|
||||
|
||||
public void setClient(Client client) {
|
||||
this.client = client.admin().indices();
|
||||
}
|
||||
|
@ -74,6 +88,33 @@ public class MappingUpdatedAction {
|
|||
if (type.equals(MapperService.DEFAULT_MAPPING)) {
|
||||
throw new IllegalArgumentException("_default_ mapping should not be updated");
|
||||
}
|
||||
|
||||
final RunOnce release = new RunOnce(() -> semaphore.release());
|
||||
try {
|
||||
semaphore.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
boolean successFullySent = false;
|
||||
try {
|
||||
sendUpdateMapping(index, type, mappingUpdate, ActionListener.runBefore(listener, release::run));
|
||||
successFullySent = true;
|
||||
} finally {
|
||||
if (successFullySent == false) {
|
||||
release.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// used by tests
|
||||
int blockedThreads() {
|
||||
return semaphore.getQueueLength();
|
||||
}
|
||||
|
||||
// can be overridden by tests
|
||||
protected void sendUpdateMapping(Index index, String type, Mapping mappingUpdate, ActionListener<Void> listener) {
|
||||
client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
|
||||
.setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO)
|
||||
.execute(new ActionListener<AcknowledgedResponse>() {
|
||||
|
@ -101,4 +142,30 @@ public class MappingUpdatedAction {
|
|||
}
|
||||
return new UncategorizedExecutionException("Failed execution", root);
|
||||
}
|
||||
|
||||
static class AdjustableSemaphore extends Semaphore {
|
||||
|
||||
private final Object maxPermitsMutex = new Object();
|
||||
private int maxPermits;
|
||||
|
||||
AdjustableSemaphore(int maxPermits, boolean fair) {
|
||||
super(maxPermits, fair);
|
||||
this.maxPermits = maxPermits;
|
||||
}
|
||||
|
||||
void setMaxPermits(int permits) {
|
||||
synchronized (maxPermitsMutex) {
|
||||
final int diff = Math.subtractExact(permits, maxPermits);
|
||||
if (diff > 0) {
|
||||
// add permits
|
||||
release(diff);
|
||||
} else if (diff < 0) {
|
||||
// remove permits
|
||||
reducePermits(Math.negateExact(diff));
|
||||
}
|
||||
|
||||
maxPermits = permits;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -215,6 +215,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
|
||||
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
|
||||
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
|
||||
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
|
||||
MetaData.SETTING_READ_ONLY_SETTING,
|
||||
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
|
||||
MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster.action.index;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
public class MappingUpdatedActionTests extends ESTestCase {
|
||||
|
||||
public void testAdjustableSemaphore() {
|
||||
AdjustableSemaphore sem = new AdjustableSemaphore(1, randomBoolean());
|
||||
assertEquals(1, sem.availablePermits());
|
||||
assertTrue(sem.tryAcquire());
|
||||
assertEquals(0, sem.availablePermits());
|
||||
assertFalse(sem.tryAcquire());
|
||||
assertEquals(0, sem.availablePermits());
|
||||
|
||||
// increase the number of max permits to 2
|
||||
sem.setMaxPermits(2);
|
||||
assertEquals(1, sem.availablePermits());
|
||||
assertTrue(sem.tryAcquire());
|
||||
assertEquals(0, sem.availablePermits());
|
||||
|
||||
// release all current permits
|
||||
sem.release();
|
||||
assertEquals(1, sem.availablePermits());
|
||||
sem.release();
|
||||
assertEquals(2, sem.availablePermits());
|
||||
|
||||
// reduce number of max permits to 1
|
||||
sem.setMaxPermits(1);
|
||||
assertEquals(1, sem.availablePermits());
|
||||
// set back to 2
|
||||
sem.setMaxPermits(2);
|
||||
assertEquals(2, sem.availablePermits());
|
||||
|
||||
// take both permits and reduce max permits
|
||||
assertTrue(sem.tryAcquire());
|
||||
assertTrue(sem.tryAcquire());
|
||||
assertEquals(0, sem.availablePermits());
|
||||
assertFalse(sem.tryAcquire());
|
||||
sem.setMaxPermits(1);
|
||||
assertEquals(-1, sem.availablePermits());
|
||||
assertFalse(sem.tryAcquire());
|
||||
|
||||
// release one permit
|
||||
sem.release();
|
||||
assertEquals(0, sem.availablePermits());
|
||||
assertFalse(sem.tryAcquire());
|
||||
|
||||
// release second permit
|
||||
sem.release();
|
||||
assertEquals(1, sem.availablePermits());
|
||||
assertTrue(sem.tryAcquire());
|
||||
}
|
||||
|
||||
public void testMappingUpdatedActionBlocks() throws Exception {
|
||||
List<ActionListener<Void>> inFlightListeners = new CopyOnWriteArrayList<>();
|
||||
final MappingUpdatedAction mua = new MappingUpdatedAction(Settings.builder()
|
||||
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1).build(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
|
||||
|
||||
@Override
|
||||
protected void sendUpdateMapping(Index index, String type, Mapping mappingUpdate, ActionListener<Void> listener) {
|
||||
inFlightListeners.add(listener);
|
||||
}
|
||||
};
|
||||
|
||||
PlainActionFuture<Void> fut1 = new PlainActionFuture<>();
|
||||
mua.updateMappingOnMaster(null, "test", null, fut1);
|
||||
assertEquals(1, inFlightListeners.size());
|
||||
assertEquals(0, mua.blockedThreads());
|
||||
|
||||
PlainActionFuture<Void> fut2 = new PlainActionFuture<>();
|
||||
Thread thread = new Thread(() -> {
|
||||
mua.updateMappingOnMaster(null, "test", null, fut2); // blocked
|
||||
});
|
||||
thread.start();
|
||||
assertBusy(() -> assertEquals(1, mua.blockedThreads()));
|
||||
|
||||
assertEquals(1, inFlightListeners.size());
|
||||
assertFalse(fut1.isDone());
|
||||
inFlightListeners.remove(0).onResponse(null);
|
||||
assertTrue(fut1.isDone());
|
||||
|
||||
thread.join();
|
||||
assertEquals(0, mua.blockedThreads());
|
||||
assertEquals(1, inFlightListeners.size());
|
||||
assertFalse(fut2.isDone());
|
||||
inFlightListeners.remove(0).onResponse(null);
|
||||
assertTrue(fut2.isDone());
|
||||
}
|
||||
}
|
|
@ -888,6 +888,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
|
||||
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY))
|
||||
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1000) // o.w. some tests might block
|
||||
.build());
|
||||
}
|
||||
|
||||
|
|
|
@ -491,6 +491,8 @@ public final class InternalTestCluster extends TestCluster {
|
|||
if (random.nextBoolean()) {
|
||||
builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(),
|
||||
timeValueSeconds(RandomNumbers.randomIntBetween(random, 10, 30)).getStringRep());
|
||||
builder.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(),
|
||||
RandomNumbers.randomIntBetween(random, 1, 10));
|
||||
}
|
||||
|
||||
// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
|
||||
|
|
Loading…
Reference in New Issue