Add auto create action (#56122)

Backport of #55858 to 7.x branch.

Currently the TransportBulkAction detects whether an index is missing and
then decides whether it should be auto created. The coordination of the
index creation also happens in the TransportBulkAction on the coordinating node.

This change adds a new transport action that the TransportBulkAction delegates to
if missing indices need to be created. The reasons for this change:

* Auto creation of data streams can't occur on the coordinating node.
Based on the index template (v2) either a regular index or a data stream should be created.
However if the coordinating node is slow in processing cluster state updates then it may be
unaware of the existence of certain index templates, which then can load to the
TransportBulkAction creating an index instead of a data stream. Therefor the coordination of
creating an index or data stream should occur on the master node. See #55377

* From a security perspective it is useful to know whether index creation originates from the
create index api or from auto creating a new index via the bulk or index api. For example
a user would be allowed to auto create an index, but not to use the create index api. The
auto create action will allow security to distinguish these two different patterns of
index creation.
This change adds the following new transport actions:

AutoCreateAction, the TransportBulkAction redirects to this action and this action will actually create the index (instead of the TransportCreateIndexAction). Later via #55377, can improve the AutoCreateAction to also determine whether an index or data stream should be created.

The create_index index privilege is also modified, so that if this permission is granted then a user is also allowed to auto create indices. This change does not yet add an auto_create index privilege. A future change can introduce this new index privilege or modify an existing index / write index privilege.

Relates to #53100
This commit is contained in:
Martijn van Groningen 2020-05-04 19:10:09 +02:00 committed by GitHub
parent 6b5cf1b031
commit 6d03081560
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 224 additions and 17 deletions

View File

@ -28,6 +28,10 @@ import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclu
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
@ -597,6 +601,7 @@ public class ActionModule extends AbstractModule {
actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class);
actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class);
//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);

View File

@ -0,0 +1,85 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/**
* Api that auto creates an index that originate from requests that write into an index that doesn't yet exist.
*/
public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
public static final AutoCreateAction INSTANCE = new AutoCreateAction();
public static final String NAME = "indices:admin/auto_create";
private AutoCreateAction() {
super(NAME, CreateIndexResponse::new);
}
public static final class TransportAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {
private final MetadataCreateIndexService createIndexService;
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver);
this.createIndexService = createIndexService;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected CreateIndexResponse read(StreamInput in) throws IOException {
return new CreateIndexResponse(in);
}
@Override
protected void masterOperation(CreateIndexRequest request,
ClusterState state,
ActionListener<CreateIndexResponse> listener) throws Exception {
TransportCreateIndexAction.innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
}
@Override
protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index());
}
}
}

View File

@ -70,14 +70,20 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction<Create
@Override
protected void masterOperation(final CreateIndexRequest request, final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
if (request.cause().length() == 0) {
request.cause("api");
}
innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
}
static void innerCreateIndex(CreateIndexRequest request,
ActionListener<CreateIndexResponse> listener,
IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService) {
final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())

View File

@ -33,6 +33,7 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -241,7 +242,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.preferV2Templates(), bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
createIndex(index, bulkRequest.preferV2Templates(), bulkRequest.timeout(), minNodeVersion,
new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
@ -385,13 +387,21 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return autoCreateIndex.shouldAutoCreate(index, state);
}
void createIndex(String index, Boolean preferV2Templates, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
void createIndex(String index,
Boolean preferV2Templates,
TimeValue timeout,
Version minNodeVersion,
ActionListener<CreateIndexResponse> listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexRequest.preferV2Templates(preferV2Templates);
client.admin().indices().create(createIndexRequest, listener);
if (minNodeVersion.onOrAfter(Version.V_7_8_0)) {
client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
} else {
client.admin().indices().create(createIndexRequest, listener);
}
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request,

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -111,7 +112,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
when(clusterService.state()).thenReturn(state);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(state.getNodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
when(discoveryNodes.getMinNodeVersion()).thenReturn(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(clusterService.localNode()).thenReturn(localNode);
when(localNode.isIngestNode()).thenReturn(randomBoolean());
@ -138,7 +139,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
@Override
void createIndex(String index, Boolean preferV2Templates,
TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
// If we try to create an index just immediately assume it worked
listener.onResponse(new CreateIndexResponse(true, true, index) {});
}

View File

@ -54,6 +54,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
@ -157,7 +158,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
@Override
void createIndex(String index, Boolean preferV2Templates,
TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
TimeValue timeout, Version minNodeVersion,
ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
listener.onResponse(null);
}
@ -192,7 +194,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
when(nodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
when(nodes.getMinNodeVersion()).thenReturn(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
Metadata metadata = Metadata.builder().indices(ImmutableOpenMap.<String, IndexMetadata>builder()

View File

@ -32,6 +32,8 @@ import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -40,6 +42,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -80,7 +83,7 @@ public class TransportBulkActionTests extends ESTestCase {
@Override
void createIndex(String index, Boolean preferV2Templates,
TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
listener.onResponse(null);
}
@ -90,7 +93,9 @@ public class TransportBulkActionTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
clusterService = createClusterService(threadPool);
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
clusterService = createClusterService(threadPool, discoveryNode);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
@ -32,6 +33,8 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -41,6 +44,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -82,7 +86,9 @@ public class TransportBulkActionTookTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
clusterService = createClusterService(threadPool);
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
clusterService = createClusterService(threadPool, discoveryNode);
}
@After

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
@ -60,7 +61,7 @@ public final class IndexPrivilege extends Privilege {
private static final Automaton MONITOR_AUTOMATON = patterns("indices:monitor/*");
private static final Automaton MANAGE_AUTOMATON =
unionAndMinimize(Arrays.asList(MONITOR_AUTOMATON, patterns("indices:admin/*")));
private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME);
private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME, AutoCreateAction.NAME);
private static final Automaton DELETE_INDEX_AUTOMATON = patterns(DeleteIndexAction.NAME);
private static final Automaton VIEW_METADATA_AUTOMATON = patterns(GetAliasesAction.NAME, AliasesExistAction.NAME,
GetIndexAction.NAME, IndicesExistsAction.NAME, GetFieldMappingsAction.NAME + "*", GetMappingsAction.NAME,

View File

@ -9,7 +9,7 @@ minimal:
# non-ML indices
- names: [ 'airline-data', 'index-*', 'unavailable-data', 'utopia' ]
privileges:
- indices:admin/create
- create_index
- indices:admin/refresh
- read
- index

View File

@ -36,7 +36,8 @@ public class WriteActionsTests extends SecurityIntegTestCase {
" cluster: [ ALL ]\n" +
" indices:\n" +
" - names: 'missing'\n" +
" privileges: [ 'indices:admin/create', 'indices:admin/delete' ]\n" +
" privileges: [ 'indices:admin/create', 'indices:admin/auto_create', " +
"'indices:admin/delete' ]\n" +
" - names: ['/index.*/']\n" +
" privileges: [ manage ]\n" +
" - names: ['/test.*/']\n" +

View File

@ -0,0 +1,85 @@
---
setup:
- skip:
features: headers
- do:
cluster.health:
wait_for_status: yellow
- do:
security.put_role:
name: "append_logs"
body: >
{
"indices": [
{ "names": ["logs-foobar" ], "privileges": ["create_doc", "create_index"] },
{ "names": ["logs-*" ], "privileges": ["create_doc"] }
]
}
- do:
security.put_user:
username: "test_user"
body: >
{
"password" : "x-pack-test-password",
"roles" : [ "append_logs" ],
"full_name" : "user with mixed privileges to multiple indices"
}
---
teardown:
- do:
security.delete_user:
username: "test_user"
ignore: 404
- do:
security.delete_role:
name: "append_logs"
ignore: 404
---
"Test auto index creation":
# Only auto creation of logs-foobar index works.
- do:
headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user
bulk:
body:
- '{"create": {"_index": "logs-foobar"}}'
- '{}'
- '{"create": {"_index": "logs-barbaz"}}'
- '{}'
- match: { errors: true }
- match: { items.0.create.status: 201 }
- match: { items.1.create.status: 403 }
- do: # superuser
indices.refresh:
index: "_all"
- do: # superuser
search:
rest_total_hits_as_int: true
index: "logs-*"
- match: { hits.total: 1 }
# Create the logs-barbaz with the superuser
- do: # superuser
indices.create:
index: logs-barbaz
body: {}
# Ensure that just appending data via both indices work now that the indices have been auto created
- do:
headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user
bulk:
body:
- '{"create": {"_index": "logs-foobar"}}'
- '{}'
- '{"create": {"_index": "logs-barbaz"}}'
- '{}'
- match: { errors: false }
- match: { items.0.create.status: 201 }
- match: { items.1.create.status: 201 }