Index templates: Made template filtering generic and extensible via plugins
Added the ability to register template filters that are being applied when a new index is created. The default filter that checks whether the template pattern matches the index name always runs first, additional filters can also be registered so that templates can be filtered out based on custom logic. Took the chance to add the handy source(Object... source) method to PutIndexTemplateRequest and corresponding builder Closes #7459 Closes #7454
This commit is contained in:
parent
e4b7395026
commit
92ae3c84fe
|
@ -27,6 +27,7 @@ import org.elasticsearch.cluster.block.ClusterBlock;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.transport.TransportMessage;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -38,8 +39,9 @@ import static com.google.common.collect.Maps.newHashMap;
|
|||
*/
|
||||
public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<CreateIndexClusterStateUpdateRequest> {
|
||||
|
||||
final String cause;
|
||||
final String index;
|
||||
private final TransportMessage originalMessage;
|
||||
private final String cause;
|
||||
private final String index;
|
||||
|
||||
private IndexMetaData.State state = IndexMetaData.State.OPEN;
|
||||
|
||||
|
@ -54,7 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
|
|||
private final Set<ClusterBlock> blocks = Sets.newHashSet();
|
||||
|
||||
|
||||
CreateIndexClusterStateUpdateRequest(String cause, String index) {
|
||||
CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index) {
|
||||
this.originalMessage = originalMessage;
|
||||
this.cause = cause;
|
||||
this.index = index;
|
||||
}
|
||||
|
@ -89,6 +92,10 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
|
|||
return this;
|
||||
}
|
||||
|
||||
public TransportMessage originalMessage() {
|
||||
return originalMessage;
|
||||
}
|
||||
|
||||
public String cause() {
|
||||
return cause;
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
|
|||
cause = "api";
|
||||
}
|
||||
|
||||
CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(cause, request.index())
|
||||
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
|
||||
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
|
||||
.settings(request.settings()).mappings(request.mappings())
|
||||
.aliases(request.aliases()).customs(request.customs());
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -240,6 +241,15 @@ public class PutIndexTemplateRequest extends MasterNodeOperationRequest<PutIndex
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A specialized simplified mapping source method, takes the form of simple properties definition:
|
||||
* ("field1", "type=string,store=true").
|
||||
*/
|
||||
public PutIndexTemplateRequest mapping(String type, Object... source) {
|
||||
mapping(type, PutMappingRequest.buildFromSimplifiedDef(type, source));
|
||||
return this;
|
||||
}
|
||||
|
||||
Map<String, String> mappings() {
|
||||
return this.mappings;
|
||||
}
|
||||
|
|
|
@ -109,6 +109,15 @@ public class PutIndexTemplateRequestBuilder extends MasterNodeOperationRequestBu
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A specialized simplified mapping source method, takes the form of simple properties definition:
|
||||
* ("field1", "type=string,store=true").
|
||||
*/
|
||||
public PutIndexTemplateRequestBuilder addMapping(String type, Object... source) {
|
||||
request.mapping(type, source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the aliases that will be associated with the index when it gets created
|
||||
*/
|
||||
|
|
|
@ -20,7 +20,9 @@
|
|||
package org.elasticsearch.cluster;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.elasticsearch.cluster.action.index.*;
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.*;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
|
@ -32,9 +34,13 @@ import org.elasticsearch.cluster.settings.ClusterDynamicSettingsModule;
|
|||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.SpawnModules;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexDynamicSettingsModule;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -42,10 +48,16 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
|
|||
|
||||
private final Settings settings;
|
||||
|
||||
private Set<Class<? extends IndexTemplateFilter>> indexTemplateFilters = new HashSet<>();
|
||||
|
||||
public ClusterModule(Settings settings) {
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
public void registerIndexTemplateFilter(Class<? extends IndexTemplateFilter> indexTemplateFilter) {
|
||||
indexTemplateFilters.add(indexTemplateFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<? extends Module> spawnModules() {
|
||||
return ImmutableList.of(new AllocationModule(settings),
|
||||
|
@ -76,5 +88,10 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
|
|||
bind(MappingUpdatedAction.class).asEagerSingleton();
|
||||
|
||||
bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();
|
||||
|
||||
Multibinder<IndexTemplateFilter> mbinder = Multibinder.newSetBinder(binder(), IndexTemplateFilter.class);
|
||||
for (Class<? extends IndexTemplateFilter> indexTemplateFilter : indexTemplateFilters) {
|
||||
mbinder.addBinding().to(indexTemplateFilter);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
||||
|
||||
/**
|
||||
* Enables filtering the index templates that will be applied for an index, per create index request.
|
||||
*/
|
||||
public interface IndexTemplateFilter {
|
||||
|
||||
/**
|
||||
* @return {@code true} if the given template should be applied on the newly created index,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
boolean apply(CreateIndexClusterStateUpdateRequest request, IndexTemplateMetaData template);
|
||||
|
||||
static class Compound implements IndexTemplateFilter {
|
||||
|
||||
private IndexTemplateFilter[] filters;
|
||||
|
||||
Compound(IndexTemplateFilter... filters) {
|
||||
this.filters = filters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(CreateIndexClusterStateUpdateRequest request, IndexTemplateMetaData template) {
|
||||
for (IndexTemplateFilter filter : filters) {
|
||||
if (!filter.apply(request, template)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -72,10 +72,7 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -88,6 +85,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
|
|||
public class MetaDataCreateIndexService extends AbstractComponent {
|
||||
|
||||
public final static int MAX_INDEX_NAME_BYTES = 100;
|
||||
private static final DefaultIndexTemplateFilter DEFAULT_INDEX_TEMPLATE_FILTER = new DefaultIndexTemplateFilter();
|
||||
|
||||
private final Environment environment;
|
||||
private final ThreadPool threadPool;
|
||||
|
@ -98,11 +96,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
private final Version version;
|
||||
private final String riverIndexName;
|
||||
private final AliasValidator aliasValidator;
|
||||
private final IndexTemplateFilter indexTemplateFilter;
|
||||
|
||||
@Inject
|
||||
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
|
||||
AllocationService allocationService, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName,
|
||||
AliasValidator aliasValidator) {
|
||||
AliasValidator aliasValidator, Set<IndexTemplateFilter> indexTemplateFilters) {
|
||||
super(settings);
|
||||
this.environment = environment;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -113,18 +112,21 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
this.version = version;
|
||||
this.riverIndexName = riverIndexName;
|
||||
this.aliasValidator = aliasValidator;
|
||||
|
||||
if (indexTemplateFilters.isEmpty()) {
|
||||
this.indexTemplateFilter = DEFAULT_INDEX_TEMPLATE_FILTER;
|
||||
} else {
|
||||
IndexTemplateFilter[] templateFilters = new IndexTemplateFilter[indexTemplateFilters.size() + 1];
|
||||
templateFilters[0] = DEFAULT_INDEX_TEMPLATE_FILTER;
|
||||
int i = 1;
|
||||
for (IndexTemplateFilter indexTemplateFilter : indexTemplateFilters) {
|
||||
templateFilters[i++] = indexTemplateFilter;
|
||||
}
|
||||
this.indexTemplateFilter = new IndexTemplateFilter.Compound(templateFilters);
|
||||
}
|
||||
}
|
||||
|
||||
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
|
||||
for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
|
||||
if (!entry.getKey().startsWith("index.")) {
|
||||
updatedSettingsBuilder.put("index." + entry.getKey(), entry.getValue());
|
||||
} else {
|
||||
updatedSettingsBuilder.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
request.settings(updatedSettingsBuilder.build());
|
||||
|
||||
// we lock here, and not within the cluster service callback since we don't want to
|
||||
// block the whole cluster state handling
|
||||
|
@ -192,6 +194,17 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
|
||||
|
||||
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
|
||||
for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
|
||||
if (!entry.getKey().startsWith("index.")) {
|
||||
updatedSettingsBuilder.put("index." + entry.getKey(), entry.getValue());
|
||||
} else {
|
||||
updatedSettingsBuilder.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
request.settings(updatedSettingsBuilder.build());
|
||||
|
||||
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
|
||||
|
||||
@Override
|
||||
|
@ -230,7 +243,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
// we only find a template when its an API call (a new index)
|
||||
// find templates, highest order are better matching
|
||||
List<IndexTemplateMetaData> templates = findTemplates(request, currentState);
|
||||
List<IndexTemplateMetaData> templates = findTemplates(request, currentState, indexTemplateFilter);
|
||||
|
||||
Map<String, Custom> customs = Maps.newHashMap();
|
||||
|
||||
|
@ -486,11 +499,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
|
||||
private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state, IndexTemplateFilter indexTemplateFilter) {
|
||||
List<IndexTemplateMetaData> templates = Lists.newArrayList();
|
||||
for (ObjectCursor<IndexTemplateMetaData> cursor : state.metaData().templates().values()) {
|
||||
IndexTemplateMetaData template = cursor.value;
|
||||
if (Regex.simpleMatch(template.template(), request.index())) {
|
||||
if (indexTemplateFilter.apply(request, template)) {
|
||||
templates.add(template);
|
||||
}
|
||||
}
|
||||
|
@ -506,7 +519,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
byte[] templatesData = Streams.copyToByteArray(templatesFile);
|
||||
parser = XContentHelper.createParser(templatesData, 0, templatesData.length);
|
||||
IndexTemplateMetaData template = IndexTemplateMetaData.Builder.fromXContent(parser);
|
||||
if (Regex.simpleMatch(template.template(), request.index())) {
|
||||
if (indexTemplateFilter.apply(request, template)) {
|
||||
templates.add(template);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -530,4 +543,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticsearchException {
|
||||
validateIndexName(request.index(), state);
|
||||
}
|
||||
|
||||
private static class DefaultIndexTemplateFilter implements IndexTemplateFilter {
|
||||
@Override
|
||||
public boolean apply(CreateIndexClusterStateUpdateRequest request, IndexTemplateMetaData template) {
|
||||
return Regex.simpleMatch(template.template(), request.index());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.template;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateFilter;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.AbstractPlugin;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.core.IsNull.notNullValue;
|
||||
|
||||
@ClusterScope(scope = Scope.SUITE)
|
||||
public class IndexTemplateFilteringTests extends ElasticsearchIntegrationTest{
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return ImmutableSettings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("plugin.types", TestPlugin.class.getName())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTemplateFiltering() throws Exception {
|
||||
client().admin().indices().preparePutTemplate("template1")
|
||||
.setTemplate("test*")
|
||||
.addMapping("type1", "field1", "type=string").get();
|
||||
|
||||
client().admin().indices().preparePutTemplate("template2")
|
||||
.setTemplate("test*")
|
||||
.addMapping("type2", "field2", "type=string").get();
|
||||
|
||||
client().admin().indices().preparePutTemplate("template3")
|
||||
.setTemplate("no_match")
|
||||
.addMapping("type3", "field3", "type=string").get();
|
||||
|
||||
assertAcked(prepareCreate("test").putHeader("header_test", "header_value"));
|
||||
|
||||
GetMappingsResponse response = client().admin().indices().prepareGetMappings("test").get();
|
||||
assertThat(response, notNullValue());
|
||||
ImmutableOpenMap<String, MappingMetaData> metadata = response.getMappings().get("test");
|
||||
assertThat(metadata.size(), is(1));
|
||||
assertThat(metadata.get("type2"), notNullValue());
|
||||
}
|
||||
|
||||
|
||||
public static class TestFilter implements IndexTemplateFilter {
|
||||
@Override
|
||||
public boolean apply(CreateIndexClusterStateUpdateRequest request, IndexTemplateMetaData template) {
|
||||
//make sure that no_match template is filtered out before the custom filters as it doesn't match the index name
|
||||
return (template.name().equals("template2") || template.name().equals("no_match")) && request.originalMessage().getHeader("header_test").equals("header_value");
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestPlugin extends AbstractPlugin {
|
||||
@Override
|
||||
public String name() {
|
||||
return "test-plugin";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "";
|
||||
}
|
||||
|
||||
public void onModule(ClusterModule module) {
|
||||
module.registerIndexTemplateFilter(TestFilter.class);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue