Index creation and setting update may not return deprecation logging (#22702)
Those services validate their setting before submitting an AckedClusterStateUpdateTask to the cluster state service. An acked cluster state may be completed by a networking thread when the last acks as received. As such it needs special care to make sure that thread context headers are handled correctly.
This commit is contained in:
parent
fc4dc5ef21
commit
5d806bf93e
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Restores the given {@link org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext}
|
||||
* once the listener is invoked
|
||||
*/
|
||||
public final class ContextPreservingActionListener<R> implements ActionListener<R> {
|
||||
|
||||
private final ActionListener<R> delegate;
|
||||
private final Supplier<ThreadContext.StoredContext> context;
|
||||
|
||||
public ContextPreservingActionListener(Supplier<ThreadContext.StoredContext> contextSupplier, ActionListener<R> delegate) {
|
||||
this.delegate = delegate;
|
||||
this.context = contextSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(R r) {
|
||||
try (ThreadContext.StoredContext ignore = context.get()) {
|
||||
delegate.onResponse(r);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try (ThreadContext.StoredContext ignore = context.get()) {
|
||||
delegate.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.cluster.metadata;
|
|||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
|
@ -33,6 +32,7 @@ import org.elasticsearch.action.admin.indices.alias.Alias;
|
|||
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.ActiveShardsObserver;
|
||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
|
@ -67,7 +67,6 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.MapperService.MergeReason;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
|
@ -116,6 +115,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
private final IndexScopedSettings indexScopedSettings;
|
||||
private final ActiveShardsObserver activeShardsObserver;
|
||||
private final NamedXContentRegistry xContentRegistry;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
@Inject
|
||||
public MetaDataCreateIndexService(Settings settings, ClusterService clusterService,
|
||||
|
@ -131,6 +131,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
this.env = env;
|
||||
this.indexScopedSettings = indexScopedSettings;
|
||||
this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool);
|
||||
this.threadPool = threadPool;
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
}
|
||||
|
||||
|
@ -221,7 +222,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
request.settings(updatedSettingsBuilder.build());
|
||||
|
||||
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, wrapPreservingContext(listener)) {
|
||||
@Override
|
||||
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
||||
return new ClusterStateUpdateResponse(acknowledged);
|
||||
|
@ -473,6 +474,10 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
private ContextPreservingActionListener<ClusterStateUpdateResponse> wrapPreservingContext(ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
return new ContextPreservingActionListener<>(threadPool.getThreadContext().newRestorableContext(false), listener);
|
||||
}
|
||||
|
||||
private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws IOException {
|
||||
List<IndexTemplateMetaData> templateMetadata = new ArrayList<>();
|
||||
for (ObjectCursor<IndexTemplateMetaData> cursor : state.metaData().templates().values()) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -44,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -65,12 +67,14 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
|
||||
private final IndexScopedSettings indexScopedSettings;
|
||||
private final IndicesService indicesService;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
@Inject
|
||||
public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterService, AllocationService allocationService,
|
||||
IndexScopedSettings indexScopedSettings, IndicesService indicesService) {
|
||||
IndexScopedSettings indexScopedSettings, IndicesService indicesService, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService.addListener(this);
|
||||
this.allocationService = allocationService;
|
||||
this.indexScopedSettings = indexScopedSettings;
|
||||
|
@ -180,7 +184,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
final boolean preserveExisting = request.isPreserveExisting();
|
||||
|
||||
clusterService.submitStateUpdateTask("update-settings",
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, wrapPreservingContext(listener)) {
|
||||
|
||||
@Override
|
||||
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
||||
|
@ -290,6 +294,10 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
});
|
||||
}
|
||||
|
||||
private ContextPreservingActionListener<ClusterStateUpdateResponse> wrapPreservingContext(ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
return new ContextPreservingActionListener<>(threadPool.getThreadContext().newRestorableContext(false), listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the cluster block only iff the setting exists in the given settings
|
||||
*/
|
||||
|
@ -308,9 +316,8 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
|
||||
|
||||
public void upgradeIndexSettings(final UpgradeSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
|
||||
|
||||
clusterService.submitStateUpdateTask("update-index-compatibility-versions", new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
|
||||
clusterService.submitStateUpdateTask("update-index-compatibility-versions",
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, wrapPreservingContext(listener)) {
|
||||
|
||||
@Override
|
||||
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ContextPreservingActionListenerTests extends ESTestCase {
|
||||
|
||||
public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
|
||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
||||
final boolean nonEmptyContext = randomBoolean();
|
||||
if (nonEmptyContext) {
|
||||
threadContext.putHeader("not empty", "value");
|
||||
}
|
||||
ContextPreservingActionListener<Void> actionListener;
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
threadContext.putHeader("foo", "bar");
|
||||
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
|
||||
new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNull(threadContext.getHeader("not empty"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException("onFailure shouldn't be called", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
|
||||
actionListener.onResponse(null);
|
||||
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testOriginalContextIsPreservedAfterOnFailure() throws Exception {
|
||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
||||
final boolean nonEmptyContext = randomBoolean();
|
||||
if (nonEmptyContext) {
|
||||
threadContext.putHeader("not empty", "value");
|
||||
}
|
||||
ContextPreservingActionListener<Void> actionListener;
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
threadContext.putHeader("foo", "bar");
|
||||
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
|
||||
new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
throw new RuntimeException("onResponse shouldn't be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNull(threadContext.getHeader("not empty"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
|
||||
actionListener.onFailure(null);
|
||||
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testOriginalContextIsWhenListenerThrows() throws Exception {
|
||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
||||
final boolean nonEmptyContext = randomBoolean();
|
||||
if (nonEmptyContext) {
|
||||
threadContext.putHeader("not empty", "value");
|
||||
}
|
||||
ContextPreservingActionListener<Void> actionListener;
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
threadContext.putHeader("foo", "bar");
|
||||
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
|
||||
new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNull(threadContext.getHeader("not empty"));
|
||||
throw new RuntimeException("onResponse called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNull(threadContext.getHeader("not empty"));
|
||||
throw new RuntimeException("onFailure called");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
|
||||
RuntimeException e = expectThrows(RuntimeException.class, () -> actionListener.onResponse(null));
|
||||
assertEquals("onResponse called", e.getMessage());
|
||||
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
|
||||
e = expectThrows(RuntimeException.class, () -> actionListener.onFailure(null));
|
||||
assertEquals("onFailure called", e.getMessage());
|
||||
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -360,8 +360,8 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
public void testClosedIndexOnReroute() throws InterruptedException {
|
||||
final String index = "test";
|
||||
// no replicas in oder to skip the replication part
|
||||
setState(clusterService, new ClusterStateChanges(xContentRegistry()).closeIndices(state(index, true, ShardRoutingState.UNASSIGNED),
|
||||
new CloseIndexRequest(index)));
|
||||
setState(clusterService, new ClusterStateChanges(xContentRegistry(), threadPool).closeIndices(state(index, true,
|
||||
ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index)));
|
||||
logger.debug("--> using initial state:\n{}", clusterService.state());
|
||||
Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
|
|
|
@ -114,7 +114,7 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
private final TransportClusterRerouteAction transportClusterRerouteAction;
|
||||
private final TransportCreateIndexAction transportCreateIndexAction;
|
||||
|
||||
public ClusterStateChanges(NamedXContentRegistry xContentRegistry) {
|
||||
public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool threadPool) {
|
||||
super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build());
|
||||
|
||||
allocationService = new AllocationService(settings, new AllocationDeciders(settings,
|
||||
|
@ -130,7 +130,6 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
|
||||
DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterSettings);
|
||||
Environment environment = new Environment(settings);
|
||||
ThreadPool threadPool = null; // it's not used
|
||||
Transport transport = null; // it's not used
|
||||
|
||||
// mocks
|
||||
|
@ -170,7 +169,7 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
metaDataIndexUpgradeService, indicesService);
|
||||
MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(settings, clusterService, allocationService);
|
||||
MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(settings, clusterService,
|
||||
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService);
|
||||
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, threadPool);
|
||||
MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService,
|
||||
allocationService, new AliasValidator(settings), environment,
|
||||
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, threadPool, xContentRegistry);
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.elasticsearch.common.util.set.Sets;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -74,7 +75,21 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndicesClusterStateServiceTestCase {
|
||||
|
||||
private final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry());
|
||||
private ThreadPool threadPool;
|
||||
private ClusterStateChanges cluster;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
terminate(threadPool);
|
||||
}
|
||||
|
||||
/**
|
||||
* needed due to random usage of {@link IndexMetaData#INDEX_SHADOW_REPLICAS_SETTING}. removed once
|
||||
|
|
|
@ -172,8 +172,8 @@
|
|||
---
|
||||
"Test cat shards with shadow replicas":
|
||||
- skip:
|
||||
version: " - 6.99.99"
|
||||
reason: deprecation was added in 5.2.0 (but this is disable now due to a bug in ThreadContext, Boaz is on it)
|
||||
version: " - 5.1.99"
|
||||
reason: deprecation was added in 5.2.0
|
||||
features: "warnings"
|
||||
|
||||
- do:
|
||||
|
|
Loading…
Reference in New Issue