Single instance of the IndexNameExpressionResolver (#52604)
This commit modifies the codebase so that our production code uses a single instance of the IndexNameExpressionResolver class. This change is being made in preparation for allowing name expression resolution to be augmented by a plugin. In order to remove some instances of IndexNameExpressionResolver, the single instance is added as a parameter of Plugin#createComponents and PersistentTaskPlugin#getPersistentTasksExecutor. Backport of #52596
This commit is contained in:
parent
ed957f35a9
commit
f3f6ff97ee
|
@ -114,6 +114,7 @@ import org.apache.lucene.analysis.util.ElisionFilter;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
|
@ -162,7 +163,8 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin, Scri
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
this.scriptService.set(scriptService);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public class PredicateTokenScriptFilterTests extends ESTokenStreamTestCase {
|
|||
};
|
||||
|
||||
CommonAnalysisPlugin plugin = new CommonAnalysisPlugin();
|
||||
plugin.createComponents(null, null, null, null, scriptService, null, null, null, null);
|
||||
plugin.createComponents(null, null, null, null, scriptService, null, null, null, null, null);
|
||||
AnalysisModule module
|
||||
= new AnalysisModule(TestEnvironment.newEnvironment(settings), Collections.singletonList(plugin));
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ public class ScriptedConditionTokenFilterTests extends ESTokenStreamTestCase {
|
|||
};
|
||||
|
||||
CommonAnalysisPlugin plugin = new CommonAnalysisPlugin();
|
||||
plugin.createComponents(null, null, null, null, scriptService, null, null, null, null);
|
||||
plugin.createComponents(null, null, null, null, scriptService, null, null, null, null, null);
|
||||
AnalysisModule module
|
||||
= new AnalysisModule(TestEnvironment.newEnvironment(settings), Collections.singletonList(plugin));
|
||||
|
||||
|
|
|
@ -84,7 +84,8 @@ public class ReindexPlugin extends Plugin implements ActionPlugin {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService));
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.action.support.ActionFilter;
|
|||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
|
@ -151,7 +152,8 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
testFilter.set(new ReindexFromRemoteWithAuthTests.TestFilter(threadPool));
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Build;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -88,7 +89,8 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
|
|||
final NamedXContentRegistry xContentRegistry,
|
||||
final Environment environment,
|
||||
final NodeEnvironment nodeEnvironment,
|
||||
final NamedWriteableRegistry namedWriteableRegistry) {
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
final IndexNameExpressionResolver expressionResolver) {
|
||||
if (enabled) {
|
||||
/*
|
||||
* Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the
|
||||
|
|
|
@ -61,28 +61,28 @@ public class SystemdPluginTests extends ESTestCase {
|
|||
|
||||
public void testIsEnabled() {
|
||||
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString());
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null);
|
||||
assertTrue(plugin.isEnabled());
|
||||
assertNotNull(plugin.extender);
|
||||
}
|
||||
|
||||
public void testIsNotPackageDistribution() {
|
||||
final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString());
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null);
|
||||
assertFalse(plugin.isEnabled());
|
||||
assertNull(plugin.extender);
|
||||
}
|
||||
|
||||
public void testIsImplicitlyNotEnabled() {
|
||||
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null);
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null);
|
||||
assertFalse(plugin.isEnabled());
|
||||
assertNull(plugin.extender);
|
||||
}
|
||||
|
||||
public void testIsExplicitlyNotEnabled() {
|
||||
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString());
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null);
|
||||
assertFalse(plugin.isEnabled());
|
||||
assertNull(plugin.extender);
|
||||
}
|
||||
|
@ -181,7 +181,7 @@ public class SystemdPluginTests extends ESTestCase {
|
|||
}
|
||||
|
||||
};
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
|
||||
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null, null);
|
||||
if (Boolean.TRUE.toString().equals(esSDNotify)) {
|
||||
assertNotNull(plugin.extender);
|
||||
} else {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
|
@ -299,7 +300,8 @@ public class ContextAndHeaderTransportIT extends HttpSmokeTestCase {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
loggingFilter.set(new LoggingFilter(threadPool));
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.lucene.util.Constants;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.TriFunction;
|
||||
|
@ -129,6 +130,7 @@ public final class IndexModule {
|
|||
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
|
||||
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
|
||||
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
|
||||
private final IndexNameExpressionResolver expressionResolver;
|
||||
private final AtomicBoolean frozen = new AtomicBoolean(false);
|
||||
private final BooleanSupplier allowExpensiveQueries;
|
||||
|
||||
|
@ -146,7 +148,8 @@ public final class IndexModule {
|
|||
final AnalysisRegistry analysisRegistry,
|
||||
final EngineFactory engineFactory,
|
||||
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
|
||||
final BooleanSupplier allowExpensiveQueries) {
|
||||
final BooleanSupplier allowExpensiveQueries,
|
||||
final IndexNameExpressionResolver expressionResolver) {
|
||||
this.indexSettings = indexSettings;
|
||||
this.analysisRegistry = analysisRegistry;
|
||||
this.engineFactory = Objects.requireNonNull(engineFactory);
|
||||
|
@ -154,6 +157,7 @@ public final class IndexModule {
|
|||
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
|
||||
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
|
||||
this.allowExpensiveQueries = allowExpensiveQueries;
|
||||
this.expressionResolver = expressionResolver;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -427,7 +431,7 @@ public final class IndexModule {
|
|||
new SimilarityService(indexSettings, scriptService, similarities), shardStoreDeleter, indexAnalyzers,
|
||||
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
|
||||
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
|
||||
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries);
|
||||
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver);
|
||||
success = true;
|
||||
return indexService;
|
||||
} finally {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.Assertions;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
|
@ -143,6 +144,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
private final ClusterService clusterService;
|
||||
private final Client client;
|
||||
private final CircuitBreakerService circuitBreakerService;
|
||||
private final IndexNameExpressionResolver expressionResolver;
|
||||
private Supplier<Sort> indexSortSupplier;
|
||||
|
||||
public IndexService(
|
||||
|
@ -169,7 +171,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
List<IndexingOperationListener> indexingOperationListeners,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
BooleanSupplier idFieldDataEnabled,
|
||||
BooleanSupplier allowExpensiveQueries) {
|
||||
BooleanSupplier allowExpensiveQueries,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
super(indexSettings);
|
||||
this.allowExpensiveQueries = allowExpensiveQueries;
|
||||
this.indexSettings = indexSettings;
|
||||
|
@ -177,6 +180,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
this.similarityService = similarityService;
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
this.expressionResolver = expressionResolver;
|
||||
if (needsMapperService(indexSettings, indexCreationContext)) {
|
||||
assert indexAnalyzers != null;
|
||||
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
|
||||
|
@ -569,7 +573,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
* {@link IndexReader}-specific optimizations, such as rewriting containing range queries.
|
||||
*/
|
||||
public QueryShardContext newQueryShardContext(int shardId, IndexSearcher searcher, LongSupplier nowInMillis, String clusterAlias) {
|
||||
SearchIndexNameMatcher indexNameMatcher = new SearchIndexNameMatcher(index().getName(), clusterAlias, clusterService);
|
||||
final SearchIndexNameMatcher indexNameMatcher =
|
||||
new SearchIndexNameMatcher(index().getName(), clusterAlias, clusterService, expressionResolver);
|
||||
return new QueryShardContext(
|
||||
shardId, indexSettings, bigArrays, indexCache.bitsetFilterCache(), indexFieldData::getForField, mapperService(),
|
||||
similarityService(), scriptService, xContentRegistry, namedWriteableRegistry, client, searcher, nowInMillis, clusterAlias,
|
||||
|
|
|
@ -46,11 +46,12 @@ public class SearchIndexNameMatcher implements Predicate<String> {
|
|||
*/
|
||||
public SearchIndexNameMatcher(String indexName,
|
||||
String clusterAlias,
|
||||
ClusterService clusterService) {
|
||||
ClusterService clusterService,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
this.indexName = indexName;
|
||||
this.clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) ? null : clusterAlias;
|
||||
this.clusterService = clusterService;
|
||||
this.expressionResolver = new IndexNameExpressionResolver();
|
||||
this.expressionResolver = expressionResolver;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -598,7 +598,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
indexCreationContext);
|
||||
|
||||
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
|
||||
directoryFactories, () -> allowExpensiveQueries);
|
||||
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
|
||||
for (IndexingOperationListener operationListener : indexingOperationListeners) {
|
||||
indexModule.addIndexOperationListener(operationListener);
|
||||
}
|
||||
|
@ -668,7 +668,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
|
||||
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
|
||||
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
|
||||
directoryFactories, () -> allowExpensiveQueries);
|
||||
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
|
||||
pluginsService.onIndexModule(indexModule);
|
||||
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
|
||||
}
|
||||
|
|
|
@ -470,7 +470,7 @@ public class Node implements Closeable {
|
|||
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
|
||||
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
|
||||
scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
|
||||
namedWriteableRegistry).stream())
|
||||
namedWriteableRegistry, clusterModule.getIndexNameExpressionResolver()).stream())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
|
||||
|
@ -538,7 +538,8 @@ public class Node implements Closeable {
|
|||
|
||||
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
|
||||
.filterPlugins(PersistentTaskPlugin.class).stream()
|
||||
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))
|
||||
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule,
|
||||
clusterModule.getIndexNameExpressionResolver()))
|
||||
.flatMap(List::stream)
|
||||
.collect(toList());
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.plugins;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||
|
@ -38,7 +39,8 @@ public interface PersistentTaskPlugin {
|
|||
default List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
Client client,
|
||||
SettingsModule settingsModule) {
|
||||
SettingsModule settingsModule,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.plugins;
|
|||
import org.elasticsearch.bootstrap.BootstrapCheck;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
|
@ -114,11 +115,13 @@ public abstract class Plugin implements Closeable {
|
|||
* @param environment the environment for path and setting configurations
|
||||
* @param nodeEnvironment the node environment used coordinate access to the data paths
|
||||
* @param namedWriteableRegistry the registry for {@link NamedWriteable} object parsing
|
||||
* @param indexNameExpressionResolver A service that resolves expression to index and alias names
|
||||
*/
|
||||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.get.GetRequest;
|
|||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -100,7 +101,8 @@ public class AsyncIngestProcessorIT extends ESSingleNodeTestCase {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
this.threadPool = threadPool;
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexGraveyard;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
|
@ -214,15 +215,16 @@ public class ClusterStateIT extends ESIntegTestCase {
|
|||
|
||||
@Override
|
||||
public Collection<Object> createComponents(
|
||||
final Client client,
|
||||
final ClusterService clusterService,
|
||||
final ThreadPool threadPool,
|
||||
final ResourceWatcherService resourceWatcherService,
|
||||
final ScriptService scriptService,
|
||||
final NamedXContentRegistry xContentRegistry,
|
||||
final Environment environment,
|
||||
final NodeEnvironment nodeEnvironment,
|
||||
final NamedWriteableRegistry namedWriteableRegistry) {
|
||||
final Client client,
|
||||
final ClusterService clusterService,
|
||||
final ThreadPool threadPool,
|
||||
final ResourceWatcherService resourceWatcherService,
|
||||
final ScriptService scriptService,
|
||||
final NamedXContentRegistry xContentRegistry,
|
||||
final Environment environment,
|
||||
final NodeEnvironment nodeEnvironment,
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
clusterService.addListener(event -> {
|
||||
final ClusterState state = event.state();
|
||||
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
|
@ -381,7 +382,8 @@ public class SimpleClusterStateIT extends ESIntegTestCase {
|
|||
final NamedXContentRegistry xContentRegistry,
|
||||
final Environment environment,
|
||||
final NodeEnvironment nodeEnvironment,
|
||||
final NamedWriteableRegistry namedWriteableRegistry) {
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
final IndexNameExpressionResolver expressionResolver) {
|
||||
clusterService.addListener(event -> {
|
||||
final ClusterState state = event.state();
|
||||
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
|
||||
|
|
|
@ -70,12 +70,13 @@ public class TemplateUpgradeServiceIT extends ESIntegTestCase {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(UPDATE_TEMPLATE_DUMMY_SETTING, integer -> {
|
||||
logger.debug("the template dummy setting was updated to {}", integer);
|
||||
});
|
||||
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry,
|
||||
environment, nodeEnvironment, namedWriteableRegistry);
|
||||
environment, nodeEnvironment, namedWriteableRegistry, expressionResolver);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.action.ingest.GetPipelineResponse;
|
|||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -243,7 +244,8 @@ public class FinalPipelineIT extends ESIntegTestCase {
|
|||
final NamedXContentRegistry xContentRegistry,
|
||||
final Environment environment,
|
||||
final NodeEnvironment nodeEnvironment,
|
||||
final NamedWriteableRegistry namedWriteableRegistry) {
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
final IndexNameExpressionResolver expressionResolver) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.lucene.store.Directory;
|
|||
import org.apache.lucene.util.SetOnce.AlreadySetException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
|
@ -167,7 +168,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
public void testWrapperIsBound() throws IOException {
|
||||
final MockEngineFactory engineFactory = new MockEngineFactory(AssertingDirectoryReader.class);
|
||||
IndexModule module = new IndexModule(
|
||||
indexSettings, emptyAnalysisRegistry, engineFactory, Collections.emptyMap(), () -> true);
|
||||
indexSettings, emptyAnalysisRegistry, engineFactory, Collections.emptyMap(), () -> true, new IndexNameExpressionResolver());
|
||||
module.setReaderWrapper(s -> new Wrapper());
|
||||
|
||||
IndexService indexService = newIndexService(module);
|
||||
|
@ -187,8 +188,8 @@ public class IndexModuleTests extends ESTestCase {
|
|||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
|
||||
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = singletonMap(
|
||||
"foo_store", new FooFunction());
|
||||
final IndexModule module = new IndexModule(
|
||||
indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories, () -> true);
|
||||
final IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories,
|
||||
() -> true, new IndexNameExpressionResolver());
|
||||
|
||||
final IndexService indexService = newIndexService(module);
|
||||
assertThat(indexService.getDirectoryFactory(), instanceOf(FooFunction.class));
|
||||
|
@ -485,8 +486,8 @@ public class IndexModuleTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private static IndexModule createIndexModule(IndexSettings indexSettings, AnalysisRegistry emptyAnalysisRegistry) {
|
||||
return new IndexModule(
|
||||
indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap(), () -> true);
|
||||
return new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap(), () -> true,
|
||||
new IndexNameExpressionResolver());
|
||||
}
|
||||
|
||||
class CustomQueryCache implements QueryCache {
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -48,8 +49,8 @@ public class SearchIndexNameMatcherTests extends ESTestCase {
|
|||
ClusterService clusterService = mock(ClusterService.class);
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
matcher = new SearchIndexNameMatcher("index1", "", clusterService);
|
||||
remoteMatcher = new SearchIndexNameMatcher("index1", "cluster", clusterService);
|
||||
matcher = new SearchIndexNameMatcher("index1", "", clusterService, new IndexNameExpressionResolver());
|
||||
remoteMatcher = new SearchIndexNameMatcher("index1", "cluster", clusterService, new IndexNameExpressionResolver());
|
||||
}
|
||||
|
||||
private static IndexMetaData.Builder indexBuilder(String index) {
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -94,7 +95,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
Client client,
|
||||
SettingsModule settingsModule) {
|
||||
SettingsModule settingsModule,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
return Collections.singletonList(new TestPersistentTasksExecutor(clusterService));
|
||||
}
|
||||
|
||||
|
|
|
@ -175,7 +175,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
final NamedXContentRegistry xContentRegistry,
|
||||
final Environment environment,
|
||||
final NodeEnvironment nodeEnvironment,
|
||||
final NamedWriteableRegistry namedWriteableRegistry) {
|
||||
final NamedWriteableRegistry namedWriteableRegistry,
|
||||
final IndexNameExpressionResolver expressionResolver) {
|
||||
this.client = client;
|
||||
if (enabled == false) {
|
||||
return emptyList();
|
||||
|
@ -204,7 +205,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
Client client,
|
||||
SettingsModule settingsModule) {
|
||||
SettingsModule settingsModule,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, settingsModule));
|
||||
}
|
||||
|
||||
|
|
|
@ -253,7 +253,8 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin,
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
List<Object> components = new ArrayList<>();
|
||||
|
||||
final SSLService sslService = new SSLService(environment);
|
||||
|
|
|
@ -118,7 +118,8 @@ public final class AnomalyDetectorsIndex {
|
|||
* Creates the .ml-state-000001 index (if necessary)
|
||||
* Creates the .ml-state-write alias for the .ml-state-000001 index (if necessary)
|
||||
*/
|
||||
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {
|
||||
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver,
|
||||
final ActionListener<Boolean> finalListener) {
|
||||
|
||||
if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) {
|
||||
finalListener.onResponse(false);
|
||||
|
@ -143,8 +144,7 @@ public final class AnomalyDetectorsIndex {
|
|||
finalListener::onFailure
|
||||
);
|
||||
|
||||
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
|
||||
String[] stateIndices = indexNameExpressionResolver.concreteIndexNames(state,
|
||||
String[] stateIndices = resolver.concreteIndexNames(state,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
jobStateIndexPattern());
|
||||
if (stateIndices.length > 0) {
|
||||
|
|
|
@ -151,14 +151,15 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
List<Object> components = new ArrayList<>();
|
||||
components.addAll(super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService,
|
||||
xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry));
|
||||
xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, expressionResolver));
|
||||
|
||||
filterPlugins(Plugin.class).stream().forEach(p ->
|
||||
components.addAll(p.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService,
|
||||
xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry))
|
||||
xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, expressionResolver))
|
||||
);
|
||||
return components;
|
||||
}
|
||||
|
@ -409,9 +410,10 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
|
|||
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
Client client,
|
||||
SettingsModule settingsModule) {
|
||||
SettingsModule settingsModule,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
return filterPlugins(PersistentTaskPlugin.class).stream()
|
||||
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))
|
||||
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule, expressionResolver))
|
||||
.flatMap(List::stream)
|
||||
.collect(toList());
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -102,7 +103,7 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
|
|||
|
||||
public void testCreateStateIndexAndAliasIfNecessary_CleanState() {
|
||||
ClusterState clusterState = createClusterState(Collections.emptyMap());
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, finalListener);
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener);
|
||||
|
||||
InOrder inOrder = inOrder(indicesAdminClient, finalListener);
|
||||
inOrder.verify(indicesAdminClient).prepareCreate(INITIAL_ML_STATE);
|
||||
|
@ -116,7 +117,7 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
|
|||
|
||||
private void assertNoClientInteractionsWhenWriteAliasAlreadyExists(String indexName) {
|
||||
ClusterState clusterState = createClusterState(Collections.singletonMap(indexName, createIndexMetaDataWithAlias(indexName)));
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, finalListener);
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener);
|
||||
|
||||
verify(finalListener).onResponse(false);
|
||||
}
|
||||
|
@ -141,7 +142,7 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
|
|||
ClusterState clusterState =
|
||||
createClusterState(
|
||||
existingIndexNames.stream().collect(toMap(Function.identity(), AnomalyDetectorsIndexTests::createIndexMetaData)));
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, finalListener);
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener);
|
||||
|
||||
InOrder inOrder = inOrder(indicesAdminClient, finalListener);
|
||||
inOrder.verify(indicesAdminClient).prepareAliases();
|
||||
|
|
|
@ -198,7 +198,8 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
|
|||
NamedXContentRegistry xContentRegistry,
|
||||
Environment environment,
|
||||
NodeEnvironment nodeEnvironment,
|
||||
NamedWriteableRegistry namedWriteableRegistry
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver
|
||||
) {
|
||||
if (enabled == false || transportClientMode) {
|
||||
return emptyList();
|
||||
|
|
|
@ -61,7 +61,7 @@ public class TransportExecuteEnrichPolicyAction extends TransportMasterNodeActio
|
|||
client,
|
||||
transportService.getTaskManager(),
|
||||
threadPool,
|
||||
new IndexNameExpressionResolver(),
|
||||
indexNameExpressionResolver,
|
||||
enrichPolicyLocks,
|
||||
System::currentTimeMillis
|
||||
);
|
||||
|
|
|
@ -72,7 +72,8 @@ public class EqlPlugin extends Plugin implements ActionPlugin {
|
|||
@Override
|
||||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
|
||||
Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
|
||||
return createComponents(client, clusterService.getClusterName().value(), namedWriteableRegistry);
|
||||
}
|
||||
|
|
|
@ -188,7 +188,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
if (transportClientMode) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
|
@ -59,7 +60,8 @@ public class UpdateSettingsStepTests extends ESSingleNodeTestCase {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
return Collections.singletonList(service);
|
||||
}
|
||||
|
||||
|
|
|
@ -511,7 +511,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
if (enabled == false || transportClientMode) {
|
||||
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled
|
||||
return Collections.singletonList(new JobManagerHolder());
|
||||
|
@ -525,7 +526,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
|
|||
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
|
||||
OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
|
||||
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
|
||||
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
|
||||
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings, indexNameExpressionResolver);
|
||||
JobResultsPersister jobResultsPersister =
|
||||
new JobResultsPersister(originSettingClient, resultsPersisterService, anomalyDetectionAuditor);
|
||||
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client,
|
||||
|
@ -600,7 +601,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
|
|||
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
|
||||
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
|
||||
xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister,
|
||||
jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider);
|
||||
jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider, indexNameExpressionResolver);
|
||||
this.autodetectProcessManager.set(autodetectProcessManager);
|
||||
DatafeedJobBuilder datafeedJobBuilder =
|
||||
new DatafeedJobBuilder(
|
||||
|
@ -635,8 +636,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
|
|||
threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory);
|
||||
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry);
|
||||
assert client instanceof NodeClient;
|
||||
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager(
|
||||
(NodeClient) client, dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor);
|
||||
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager((NodeClient) client,
|
||||
dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor, indexNameExpressionResolver);
|
||||
this.dataFrameAnalyticsManager.set(dataFrameAnalyticsManager);
|
||||
|
||||
// Components shared by anomaly detection and data frame analytics
|
||||
|
@ -646,7 +647,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
|
|||
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
|
||||
autodetectProcessManager, memoryTracker);
|
||||
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
|
||||
new MlConfigMigrator(settings, client, clusterService), clusterService);
|
||||
new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);
|
||||
|
||||
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
|
||||
final InvalidLicenseEnforcer enforcer =
|
||||
|
@ -682,20 +683,22 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
Client client,
|
||||
SettingsModule settingsModule) {
|
||||
SettingsModule settingsModule,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
if (enabled == false || transportClientMode) {
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
return Arrays.asList(
|
||||
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(),
|
||||
memoryTracker.get(), client),
|
||||
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get()),
|
||||
memoryTracker.get(), client, expressionResolver),
|
||||
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get(), expressionResolver),
|
||||
new TransportStartDataFrameAnalyticsAction.TaskExecutor(settings, client, clusterService, dataFrameAnalyticsManager.get(),
|
||||
dataFrameAnalyticsAuditor.get(), memoryTracker.get())
|
||||
dataFrameAnalyticsAuditor.get(), memoryTracker.get(), expressionResolver)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -109,14 +110,17 @@ public class MlConfigMigrator {
|
|||
|
||||
private final Client client;
|
||||
private final ClusterService clusterService;
|
||||
private final IndexNameExpressionResolver expressionResolver;
|
||||
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
|
||||
|
||||
private final AtomicBoolean migrationInProgress;
|
||||
private final AtomicBoolean tookConfigSnapshot;
|
||||
|
||||
public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
|
||||
public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.clusterService = Objects.requireNonNull(clusterService);
|
||||
this.expressionResolver = Objects.requireNonNull(expressionResolver);
|
||||
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
|
||||
this.migrationInProgress = new AtomicBoolean(false);
|
||||
this.tookConfigSnapshot = new AtomicBoolean(false);
|
||||
|
@ -462,7 +466,7 @@ public class MlConfigMigrator {
|
|||
return;
|
||||
}
|
||||
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), ActionListener.wrap(
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), expressionResolver, ActionListener.wrap(
|
||||
r -> {
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest,
|
||||
ActionListener.<IndexResponse>wrap(
|
||||
|
|
|
@ -139,9 +139,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
AnomalyDetectorsIndex.configIndexName()};
|
||||
}
|
||||
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(String resultsWriteIndex, ClusterState clusterState) {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(),
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(String resultsWriteIndex, ClusterState clusterState,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
String[] indices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(),
|
||||
indicesOfInterest(resultsWriteIndex));
|
||||
List<String> unavailableIndices = new ArrayList<>(indices.length);
|
||||
for (String index : indices) {
|
||||
|
@ -345,6 +345,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
private final AutodetectProcessManager autodetectProcessManager;
|
||||
private final MlMemoryTracker memoryTracker;
|
||||
private final Client client;
|
||||
private final IndexNameExpressionResolver expressionResolver;
|
||||
|
||||
private volatile int maxConcurrentJobAllocations;
|
||||
private volatile int maxMachineMemoryPercent;
|
||||
|
@ -354,11 +355,12 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
|
||||
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
|
||||
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker,
|
||||
Client client) {
|
||||
Client client, IndexNameExpressionResolver expressionResolver) {
|
||||
super(MlTasks.JOB_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
|
||||
this.autodetectProcessManager = Objects.requireNonNull(autodetectProcessManager);
|
||||
this.memoryTracker = Objects.requireNonNull(memoryTracker);
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.expressionResolver = Objects.requireNonNull(expressionResolver);
|
||||
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
|
||||
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
|
||||
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
|
||||
|
@ -389,7 +391,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
|
||||
String jobId = params.getJobId();
|
||||
String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState, expressionResolver);
|
||||
if (unavailableIndices.size() != 0) {
|
||||
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
|
||||
String.join(",", unavailableIndices) + "]";
|
||||
|
|
|
@ -117,7 +117,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
|
|||
);
|
||||
|
||||
// 1. Verify/Create the state index and its alias exists
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, createStateIndexListener);
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, indexNameExpressionResolver, createStateIndexListener);
|
||||
}
|
||||
|
||||
private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer<ModelSnapshot> handler,
|
||||
|
|
|
@ -503,8 +503,9 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
);
|
||||
}
|
||||
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState,
|
||||
IndexNameExpressionResolver resolver,
|
||||
String... indexNames) {
|
||||
String[] concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexNames);
|
||||
List<String> unavailableIndices = new ArrayList<>(concreteIndices.length);
|
||||
for (String index : concreteIndices) {
|
||||
|
@ -523,6 +524,7 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
private final DataFrameAnalyticsManager manager;
|
||||
private final DataFrameAnalyticsAuditor auditor;
|
||||
private final MlMemoryTracker memoryTracker;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
|
||||
private volatile int maxMachineMemoryPercent;
|
||||
private volatile int maxLazyMLNodes;
|
||||
|
@ -530,13 +532,14 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
private volatile ClusterState clusterState;
|
||||
|
||||
public TaskExecutor(Settings settings, Client client, ClusterService clusterService, DataFrameAnalyticsManager manager,
|
||||
DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker) {
|
||||
DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker, IndexNameExpressionResolver resolver) {
|
||||
super(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.clusterService = Objects.requireNonNull(clusterService);
|
||||
this.manager = Objects.requireNonNull(manager);
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
this.memoryTracker = Objects.requireNonNull(memoryTracker);
|
||||
this.resolver = Objects.requireNonNull(resolver);
|
||||
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
|
||||
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
|
||||
this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
|
||||
|
@ -567,7 +570,8 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
|
||||
String id = params.getId();
|
||||
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, AnomalyDetectorsIndex.configIndexName());
|
||||
List<String> unavailableIndices =
|
||||
verifyIndicesPrimaryShardsAreActive(clusterState, resolver, AnomalyDetectorsIndex.configIndexName());
|
||||
if (unavailableIndices.size() != 0) {
|
||||
String reason = "Not opening data frame analytics job [" + id +
|
||||
"], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]";
|
||||
|
|
|
@ -367,10 +367,10 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
|||
private final DatafeedManager datafeedManager;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
|
||||
public StartDatafeedPersistentTasksExecutor(DatafeedManager datafeedManager) {
|
||||
public StartDatafeedPersistentTasksExecutor(DatafeedManager datafeedManager, IndexNameExpressionResolver resolver) {
|
||||
super(MlTasks.DATAFEED_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
|
||||
this.datafeedManager = datafeedManager;
|
||||
this.resolver = new IndexNameExpressionResolver();
|
||||
this.resolver = resolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
|||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
|
@ -58,13 +59,16 @@ public class DataFrameAnalyticsManager {
|
|||
private final DataFrameAnalyticsConfigProvider configProvider;
|
||||
private final AnalyticsProcessManager processManager;
|
||||
private final DataFrameAnalyticsAuditor auditor;
|
||||
private final IndexNameExpressionResolver expressionResolver;
|
||||
|
||||
public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider,
|
||||
AnalyticsProcessManager processManager, DataFrameAnalyticsAuditor auditor) {
|
||||
AnalyticsProcessManager processManager, DataFrameAnalyticsAuditor auditor,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.configProvider = Objects.requireNonNull(configProvider);
|
||||
this.processManager = Objects.requireNonNull(processManager);
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
this.expressionResolver = Objects.requireNonNull(expressionResolver);
|
||||
}
|
||||
|
||||
public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState, ClusterState clusterState) {
|
||||
|
@ -104,7 +108,7 @@ public class DataFrameAnalyticsManager {
|
|||
);
|
||||
|
||||
// Make sure the state index and alias exist
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasListener);
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, stateAliasListener);
|
||||
}
|
||||
|
||||
private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
|
||||
|
|
|
@ -145,10 +145,12 @@ public class JobResultsProvider {
|
|||
|
||||
private final Client client;
|
||||
private final Settings settings;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
|
||||
public JobResultsProvider(Client client, Settings settings) {
|
||||
public JobResultsProvider(Client client, Settings settings, IndexNameExpressionResolver resolver) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.settings = settings;
|
||||
this.resolver = resolver;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -265,7 +267,6 @@ public class JobResultsProvider {
|
|||
// Our read/write aliases should point to the concrete index
|
||||
// If the initial index is NOT an alias, either it is already a concrete index, or it does not exist yet
|
||||
if (state.getMetaData().hasAlias(tempIndexName)) {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
String[] concreteIndices = resolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), tempIndexName);
|
||||
|
||||
// SHOULD NOT be closed as in typical call flow checkForLeftOverDocuments already verified this
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -98,6 +99,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|||
private final JobResultsProvider jobResultsProvider;
|
||||
private final AutodetectProcessFactory autodetectProcessFactory;
|
||||
private final NormalizerFactory normalizerFactory;
|
||||
private final IndexNameExpressionResolver expressionResolver;
|
||||
|
||||
private final JobResultsPersister jobResultsPersister;
|
||||
private final JobDataCountsPersister jobDataCountsPersister;
|
||||
|
@ -117,7 +119,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|||
NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, ClusterService clusterService,
|
||||
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
|
||||
JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory,
|
||||
NormalizerFactory normalizerFactory, NativeStorageProvider nativeStorageProvider) {
|
||||
NormalizerFactory normalizerFactory, NativeStorageProvider nativeStorageProvider,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
this.environment = environment;
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -125,6 +128,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|||
this.maxAllowedRunningJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings);
|
||||
this.autodetectProcessFactory = autodetectProcessFactory;
|
||||
this.normalizerFactory = normalizerFactory;
|
||||
this.expressionResolver = expressionResolver;
|
||||
this.jobManager = jobManager;
|
||||
this.jobResultsProvider = jobResultsProvider;
|
||||
this.jobResultsPersister = jobResultsPersister;
|
||||
|
@ -435,7 +439,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|||
|
||||
// Make sure the state index and alias exist
|
||||
ActionListener<Boolean> resultsMappingUpdateHandler = ActionListener.wrap(
|
||||
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasHandler),
|
||||
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, stateAliasHandler),
|
||||
e -> closeHandler.accept(e, true)
|
||||
);
|
||||
|
||||
|
|
|
@ -91,6 +91,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testVerifyIndicesPrimaryShardsAreActive() {
|
||||
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
MetaData.Builder metaData = MetaData.builder();
|
||||
RoutingTable.Builder routingTable = RoutingTable.builder();
|
||||
addIndices(metaData, routingTable);
|
||||
|
@ -100,7 +101,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
csBuilder.metaData(metaData);
|
||||
|
||||
ClusterState cs = csBuilder.build();
|
||||
assertEquals(0, TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive(".ml-anomalies-shared", cs).size());
|
||||
assertEquals(0, TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive(".ml-anomalies-shared", cs, resolver).size());
|
||||
|
||||
metaData = new MetaData.Builder(cs.metaData());
|
||||
routingTable = new RoutingTable.Builder(cs.routingTable());
|
||||
|
@ -121,7 +122,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
|
||||
csBuilder.routingTable(routingTable.build());
|
||||
csBuilder.metaData(metaData);
|
||||
List<String> result = TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive(".ml-anomalies-shared", csBuilder.build());
|
||||
List<String> result =
|
||||
TransportOpenJobAction.verifyIndicesPrimaryShardsAreActive(".ml-anomalies-shared", csBuilder.build(), resolver);
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(indexToRemove, result.get(0));
|
||||
}
|
||||
|
@ -153,7 +155,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
||||
|
||||
TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
|
||||
Settings.EMPTY, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));
|
||||
Settings.EMPTY, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class),
|
||||
new IndexNameExpressionResolver());
|
||||
|
||||
OpenJobAction.JobParams params = new OpenJobAction.JobParams("missing_job_field");
|
||||
assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class)));
|
||||
|
@ -178,7 +181,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
csBuilder.routingTable(routingTable.build());
|
||||
|
||||
TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
|
||||
settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));
|
||||
settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class),
|
||||
new IndexNameExpressionResolver());
|
||||
|
||||
OpenJobAction.JobParams params = new OpenJobAction.JobParams("unavailable_index_with_lazy_node");
|
||||
params.setJob(mock(Job.class));
|
||||
|
@ -204,7 +208,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
csBuilder.routingTable(routingTable.build());
|
||||
|
||||
TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
|
||||
settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));
|
||||
settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class),
|
||||
new IndexNameExpressionResolver());
|
||||
|
||||
Job job = mock(Job.class);
|
||||
when(job.allowLazyOpen()).thenReturn(true);
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
|
@ -58,7 +59,9 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
|
|||
csBuilder.metaData(metaData);
|
||||
|
||||
ClusterState cs = csBuilder.build();
|
||||
assertThat(TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(cs, indexName), empty());
|
||||
assertThat(
|
||||
TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(cs, new IndexNameExpressionResolver(), indexName),
|
||||
empty());
|
||||
|
||||
metaData = new MetaData.Builder(cs.metaData());
|
||||
routingTable = new RoutingTable.Builder(cs.routingTable());
|
||||
|
@ -76,7 +79,8 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
|
|||
|
||||
csBuilder.routingTable(routingTable.build());
|
||||
csBuilder.metaData(metaData);
|
||||
List<String> result = TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(csBuilder.build(), indexName);
|
||||
List<String> result = TransportStartDataFrameAnalyticsAction.verifyIndicesPrimaryShardsAreActive(csBuilder.build(),
|
||||
new IndexNameExpressionResolver(), indexName);
|
||||
assertThat(result, contains(indexName));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.integration;
|
|||
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
|
||||
|
@ -101,7 +102,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
|
|||
Settings.Builder builder = Settings.builder()
|
||||
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
|
||||
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "test_node");
|
||||
jobResultsProvider = new JobResultsProvider(client(), builder.build());
|
||||
jobResultsProvider = new JobResultsProvider(client(), builder.build(), new IndexNameExpressionResolver());
|
||||
renormalizer = mock(Renormalizer.class);
|
||||
process = mock(AutodetectProcess.class);
|
||||
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
|
@ -60,7 +61,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
|
|||
|
||||
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
|
||||
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
|
||||
jobResultsProvider = new JobResultsProvider(client(), settings);
|
||||
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver());
|
||||
jobResultsPersister = new JobResultsPersister(
|
||||
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node"));
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.routing.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
|
@ -107,7 +108,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
|
|||
public void createComponents() throws Exception {
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
|
||||
jobProvider = new JobResultsProvider(client(), builder.build());
|
||||
jobProvider = new JobResultsProvider(client(), builder.build(), new IndexNameExpressionResolver());
|
||||
ThreadPool tp = mock(ThreadPool.class);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(builder.build(),
|
||||
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
|
@ -72,6 +73,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
||||
|
||||
private final IndexNameExpressionResolver expressionResolver = new IndexNameExpressionResolver();
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Before
|
||||
|
@ -103,7 +105,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
// put a job representing a previously migrated job
|
||||
blockingCall(actionListener -> jobConfigProvider.putJob(migratedJob, actionListener), indexResponseHolder, exceptionHolder);
|
||||
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService, expressionResolver);
|
||||
|
||||
AtomicReference<Set<String>> failedIdsHolder = new AtomicReference<>();
|
||||
Job foo = buildJobBuilder("foo").build();
|
||||
|
@ -166,7 +168,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
|
||||
|
||||
// do the migration
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService, expressionResolver);
|
||||
// the first time this is called mlmetadata will be snap-shotted
|
||||
blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
|
||||
responseHolder, exceptionHolder);
|
||||
|
@ -218,7 +220,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
|
||||
// index a doc with the same Id as the config snapshot
|
||||
PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client(), clusterService.state(), future);
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client(), clusterService.state(), expressionResolver, future);
|
||||
future.actionGet();
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id("ml-config")
|
||||
|
@ -238,7 +240,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
|
||||
|
||||
// do the migration
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService, expressionResolver);
|
||||
// writing the snapshot should fail because the doc already exists
|
||||
// in which case the migration should continue
|
||||
blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
|
||||
|
@ -293,7 +295,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
|
||||
|
||||
// do the migration
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService, expressionResolver);
|
||||
blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
|
||||
responseHolder, exceptionHolder);
|
||||
|
||||
|
@ -331,7 +333,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
|
||||
|
||||
// do the migration
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService, expressionResolver);
|
||||
blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
|
||||
responseHolder, exceptionHolder);
|
||||
|
||||
|
@ -364,7 +366,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
|
||||
|
||||
// do the migration
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService);
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService, expressionResolver);
|
||||
blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
|
||||
responseHolder, exceptionHolder);
|
||||
|
||||
|
@ -436,7 +438,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
|
|||
|
||||
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
|
||||
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
|
||||
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService, expressionResolver);
|
||||
|
||||
// if the cluster state has a job config and the index does not
|
||||
// exist it should be created
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.document.DocumentField;
|
||||
|
@ -893,7 +894,7 @@ public class JobResultsProviderTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private JobResultsProvider createProvider(Client client) {
|
||||
return new JobResultsProvider(client, Settings.EMPTY);
|
||||
return new JobResultsProvider(client, Settings.EMPTY, new IndexNameExpressionResolver());
|
||||
}
|
||||
|
||||
private static SearchResponse createSearchResponse(List<Map<String, Object>> source) throws IOException {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
|
@ -689,7 +690,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
private AutodetectProcessManager createManager(Settings settings) {
|
||||
return new AutodetectProcessManager(environment, settings,
|
||||
client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectFactory, normalizerFactory, nativeStorageProvider);
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectFactory, normalizerFactory, nativeStorageProvider,
|
||||
new IndexNameExpressionResolver());
|
||||
}
|
||||
private AutodetectProcessManager createSpyManagerAndCallProcessData(String jobId) {
|
||||
AutodetectProcessManager manager = createSpyManager();
|
||||
|
|
|
@ -125,7 +125,8 @@ public class Monitoring extends Plugin implements ActionPlugin, ReloadablePlugin
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
if (enabled == false) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
@ -141,7 +142,8 @@ public class Monitoring extends Plugin implements ActionPlugin, ReloadablePlugin
|
|||
|
||||
Set<Collector> collectors = new HashSet<>();
|
||||
collectors.add(new IndexStatsCollector(clusterService, getLicenseState(), client));
|
||||
collectors.add(new ClusterStatsCollector(settings, clusterService, getLicenseState(), client, getLicenseService()));
|
||||
collectors.add(
|
||||
new ClusterStatsCollector(settings, clusterService, getLicenseState(), client, getLicenseService(), expressionResolver));
|
||||
collectors.add(new ShardsCollector(clusterService, getLicenseState()));
|
||||
collectors.add(new NodeStatsCollector(clusterService, getLicenseState(), client));
|
||||
collectors.add(new IndexRecoveryCollector(clusterService, getLicenseState(), client));
|
||||
|
|
|
@ -64,16 +64,8 @@ public class ClusterStatsCollector extends Collector {
|
|||
final ClusterService clusterService,
|
||||
final XPackLicenseState licenseState,
|
||||
final Client client,
|
||||
final LicenseService licenseService) {
|
||||
this(settings, clusterService, licenseState, client, licenseService, new IndexNameExpressionResolver());
|
||||
}
|
||||
|
||||
ClusterStatsCollector(final Settings settings,
|
||||
final ClusterService clusterService,
|
||||
final XPackLicenseState licenseState,
|
||||
final Client client,
|
||||
final LicenseService licenseService,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final LicenseService licenseService,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(ClusterStatsMonitoringDoc.TYPE, clusterService, CLUSTER_STATS_TIMEOUT, licenseState);
|
||||
this.settings = settings;
|
||||
this.client = client;
|
||||
|
|
|
@ -67,14 +67,16 @@ public class ClusterStatsCollectorTests extends BaseCollectorTestCase {
|
|||
|
||||
public void testShouldCollectReturnsFalseIfNotMaster() {
|
||||
final ClusterStatsCollector collector =
|
||||
new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService);
|
||||
new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService,
|
||||
new IndexNameExpressionResolver());
|
||||
|
||||
assertThat(collector.shouldCollect(false), is(false));
|
||||
}
|
||||
|
||||
public void testShouldCollectReturnsTrue() {
|
||||
final ClusterStatsCollector collector =
|
||||
new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService);
|
||||
new ClusterStatsCollector(Settings.EMPTY, clusterService, licenseState, client, licenseService,
|
||||
new IndexNameExpressionResolver());
|
||||
|
||||
assertThat(collector.shouldCollect(true), is(true));
|
||||
}
|
||||
|
|
|
@ -109,7 +109,8 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
|
@ -181,7 +182,8 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
|
|||
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
Client client,
|
||||
SettingsModule settingsModule) {
|
||||
SettingsModule settingsModule,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
if (enabled == false || transportClientMode ) {
|
||||
return emptyList();
|
||||
}
|
||||
|
|
|
@ -31,19 +31,20 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction<Ge
|
|||
GetRollupIndexCapsAction.Response> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
|
||||
@Inject
|
||||
public TransportGetRollupIndexCapsAction(TransportService transportService, ClusterService clusterService,
|
||||
ActionFilters actionFilters) {
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(GetRollupIndexCapsAction.NAME, transportService, actionFilters, GetRollupIndexCapsAction.Request::new);
|
||||
this.clusterService = clusterService;
|
||||
this.resolver = indexNameExpressionResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, GetRollupIndexCapsAction.Request request,
|
||||
ActionListener<GetRollupIndexCapsAction.Response> listener) {
|
||||
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
String[] indices = resolver.concreteIndexNames(clusterService.state(),
|
||||
request.indicesOptions(), request.indices());
|
||||
Map<String, RollableIndexCaps> allCaps = getCapsByRollupIndex(Arrays.asList(indices),
|
||||
|
|
|
@ -78,18 +78,20 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
|
|||
private final BigArrays bigArrays;
|
||||
private final ScriptService scriptService;
|
||||
private final ClusterService clusterService;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
private static final Logger logger = LogManager.getLogger(RollupSearchAction.class);
|
||||
|
||||
@Inject
|
||||
public TransportRollupSearchAction(TransportService transportService,
|
||||
ActionFilters actionFilters, Client client, NamedWriteableRegistry registry, BigArrays bigArrays,
|
||||
ScriptService scriptService, ClusterService clusterService) {
|
||||
ScriptService scriptService, ClusterService clusterService, IndexNameExpressionResolver resolver) {
|
||||
super(RollupSearchAction.NAME, actionFilters, transportService.getTaskManager());
|
||||
this.client = client;
|
||||
this.registry = registry;
|
||||
this.bigArrays = bigArrays;
|
||||
this.scriptService = scriptService;
|
||||
this.clusterService = clusterService;
|
||||
this.resolver = resolver;
|
||||
|
||||
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, true, SearchRequest::new,
|
||||
new TransportRollupSearchAction.TransportHandler());
|
||||
|
@ -97,7 +99,6 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
String[] indices = resolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), request.indices());
|
||||
RollupSearchContext rollupSearchContext = separateIndices(indices, clusterService.state().getMetaData().indices());
|
||||
|
||||
|
|
|
@ -384,9 +384,11 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
try {
|
||||
return createComponents(client, threadPool, clusterService, resourceWatcherService, scriptService, xContentRegistry);
|
||||
return createComponents(client, threadPool, clusterService, resourceWatcherService, scriptService, xContentRegistry,
|
||||
expressionResolver);
|
||||
} catch (final Exception e) {
|
||||
throw new IllegalStateException("security initialization failed", e);
|
||||
}
|
||||
|
@ -395,7 +397,8 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
|
|||
// pkg private for testing - tests want to pass in their set of extensions hence we are not using the extension service directly
|
||||
Collection<Object> createComponents(Client client, ThreadPool threadPool, ClusterService clusterService,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry) throws Exception {
|
||||
NamedXContentRegistry xContentRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) throws Exception {
|
||||
if (enabled == false) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
@ -505,7 +508,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
|
|||
|
||||
final AuthorizationService authzService = new AuthorizationService(settings, allRolesStore, clusterService,
|
||||
auditTrailService, failureHandler, threadPool, anonymousUser, getAuthorizationEngine(), requestInterceptors,
|
||||
getLicenseState());
|
||||
getLicenseState(), expressionResolver);
|
||||
|
||||
components.add(nativeRolesStore); // used by roles actions
|
||||
components.add(reservedRolesStore); // used by roles actions
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexAction;
|
|||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
|
||||
import org.elasticsearch.action.update.UpdateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -115,10 +116,11 @@ public class AuthorizationService {
|
|||
public AuthorizationService(Settings settings, CompositeRolesStore rolesStore, ClusterService clusterService,
|
||||
AuditTrailService auditTrail, AuthenticationFailureHandler authcFailureHandler,
|
||||
ThreadPool threadPool, AnonymousUser anonymousUser, @Nullable AuthorizationEngine authorizationEngine,
|
||||
Set<RequestInterceptor> requestInterceptors, XPackLicenseState licenseState) {
|
||||
Set<RequestInterceptor> requestInterceptors, XPackLicenseState licenseState,
|
||||
IndexNameExpressionResolver resolver) {
|
||||
this.clusterService = clusterService;
|
||||
this.auditTrail = auditTrail;
|
||||
this.indicesAndAliasesResolver = new IndicesAndAliasesResolver(settings, clusterService);
|
||||
this.indicesAndAliasesResolver = new IndicesAndAliasesResolver(settings, clusterService, resolver);
|
||||
this.authcFailureHandler = authcFailureHandler;
|
||||
this.threadContext = threadPool.getThreadContext();
|
||||
this.anonymousUser = anonymousUser;
|
||||
|
|
|
@ -55,8 +55,8 @@ class IndicesAndAliasesResolver {
|
|||
private final IndexNameExpressionResolver nameExpressionResolver;
|
||||
private final RemoteClusterResolver remoteClusterResolver;
|
||||
|
||||
IndicesAndAliasesResolver(Settings settings, ClusterService clusterService) {
|
||||
this.nameExpressionResolver = new IndexNameExpressionResolver();
|
||||
IndicesAndAliasesResolver(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver) {
|
||||
this.nameExpressionResolver = resolver;
|
||||
this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings());
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -133,7 +134,7 @@ public class SecurityTests extends ESTestCase {
|
|||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(client.settings()).thenReturn(settings);
|
||||
return security.createComponents(client, threadPool, clusterService, mock(ResourceWatcherService.class), mock(ScriptService.class),
|
||||
xContentRegistry());
|
||||
xContentRegistry(), new IndexNameExpressionResolver());
|
||||
}
|
||||
|
||||
private static <T> T findComponent(Class<T> type, Collection<Object> components) {
|
||||
|
|
|
@ -74,6 +74,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -240,7 +241,7 @@ public class AuthorizationServiceTests extends ESTestCase {
|
|||
roleMap.put(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR.getName(), ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR);
|
||||
authorizationService = new AuthorizationService(settings, rolesStore, clusterService,
|
||||
auditTrail, new DefaultAuthenticationFailureHandler(Collections.emptyMap()), threadPool, new AnonymousUser(settings), null,
|
||||
Collections.emptySet(), new XPackLicenseState(settings));
|
||||
Collections.emptySet(), new XPackLicenseState(settings), new IndexNameExpressionResolver());
|
||||
}
|
||||
|
||||
private void authorize(Authentication authentication, String action, TransportRequest request) {
|
||||
|
@ -681,7 +682,7 @@ public class AuthorizationServiceTests extends ESTestCase {
|
|||
final AnonymousUser anonymousUser = new AnonymousUser(settings);
|
||||
authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail,
|
||||
new DefaultAuthenticationFailureHandler(Collections.emptyMap()), threadPool, anonymousUser, null, Collections.emptySet(),
|
||||
new XPackLicenseState(settings));
|
||||
new XPackLicenseState(settings), new IndexNameExpressionResolver());
|
||||
|
||||
RoleDescriptor role = new RoleDescriptor("a_all", null,
|
||||
new IndicesPrivileges[] { IndicesPrivileges.builder().indices("a").privileges("all").build() }, null);
|
||||
|
@ -709,7 +710,7 @@ public class AuthorizationServiceTests extends ESTestCase {
|
|||
final Authentication authentication = createAuthentication(new AnonymousUser(settings));
|
||||
authorizationService = new AuthorizationService(settings, rolesStore, clusterService, auditTrail,
|
||||
new DefaultAuthenticationFailureHandler(Collections.emptyMap()), threadPool, new AnonymousUser(settings), null,
|
||||
Collections.emptySet(), new XPackLicenseState(settings));
|
||||
Collections.emptySet(), new XPackLicenseState(settings), new IndexNameExpressionResolver());
|
||||
|
||||
RoleDescriptor role = new RoleDescriptor("a_all", null,
|
||||
new IndicesPrivileges[]{IndicesPrivileges.builder().indices("a").privileges("all").build()}, null);
|
||||
|
@ -1451,7 +1452,7 @@ public class AuthorizationServiceTests extends ESTestCase {
|
|||
when(licenseState.isAuthorizationEngineAllowed()).thenReturn(true);
|
||||
authorizationService = new AuthorizationService(Settings.EMPTY, rolesStore, clusterService,
|
||||
auditTrail, new DefaultAuthenticationFailureHandler(Collections.emptyMap()), threadPool, new AnonymousUser(Settings.EMPTY),
|
||||
engine, Collections.emptySet(), licenseState);
|
||||
engine, Collections.emptySet(), licenseState, new IndexNameExpressionResolver());
|
||||
Authentication authentication;
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
authentication = createAuthentication(new User("test user", "a_all"));
|
||||
|
|
|
@ -199,7 +199,7 @@ public class IndicesAndAliasesResolverTests extends ESTestCase {
|
|||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
defaultIndicesResolver = new IndicesAndAliasesResolver(settings, clusterService);
|
||||
defaultIndicesResolver = new IndicesAndAliasesResolver(settings, clusterService, new IndexNameExpressionResolver());
|
||||
}
|
||||
|
||||
public void testDashIndicesAreAllowedInShardLevelRequests() {
|
||||
|
|
|
@ -88,7 +88,8 @@ public class SqlPlugin extends Plugin implements ActionPlugin {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
|
||||
return createComponents(client, clusterService.getClusterName().value(), namedWriteableRegistry);
|
||||
}
|
||||
|
|
|
@ -246,7 +246,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
|||
NamedXContentRegistry xContentRegistry,
|
||||
Environment environment,
|
||||
NodeEnvironment nodeEnvironment,
|
||||
NamedWriteableRegistry namedWriteableRegistry
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver
|
||||
) {
|
||||
if (enabled == false || transportClientMode) {
|
||||
return emptyList();
|
||||
|
@ -293,7 +294,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
|||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
Client client,
|
||||
SettingsModule settingsModule
|
||||
SettingsModule settingsModule,
|
||||
IndexNameExpressionResolver expressionResolver
|
||||
) {
|
||||
if (enabled == false || transportClientMode) {
|
||||
return emptyList();
|
||||
|
@ -302,9 +304,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
|||
// the transform services should have been created
|
||||
assert transformServices.get() != null;
|
||||
|
||||
return Collections.singletonList(
|
||||
new TransformPersistentTasksExecutor(client, transformServices.get(), threadPool, clusterService, settingsModule.getSettings())
|
||||
);
|
||||
return Collections.singletonList(new TransformPersistentTasksExecutor(client, transformServices.get(), threadPool, clusterService,
|
||||
settingsModule.getSettings(), expressionResolver));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -62,6 +62,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
|
|||
private final TransformServices transformServices;
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
private final TransformAuditor auditor;
|
||||
private volatile int numFailureRetries;
|
||||
|
||||
|
@ -70,13 +71,15 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
|
|||
TransformServices transformServices,
|
||||
ThreadPool threadPool,
|
||||
ClusterService clusterService,
|
||||
Settings settings
|
||||
Settings settings,
|
||||
IndexNameExpressionResolver resolver
|
||||
) {
|
||||
super(TransformField.TASK_NAME, Transform.TASK_THREAD_POOL_NAME);
|
||||
this.client = client;
|
||||
this.transformServices = transformServices;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.resolver = resolver;
|
||||
this.auditor = transformServices.getAuditor();
|
||||
this.numFailureRetries = Transform.NUM_FAILURE_RETRIES_SETTING.get(settings);
|
||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(Transform.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
|
||||
|
@ -84,7 +87,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
|
|||
|
||||
@Override
|
||||
public PersistentTasksCustomMetaData.Assignment getAssignment(TransformTaskParams params, ClusterState clusterState) {
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState);
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver);
|
||||
if (unavailableIndices.size() != 0) {
|
||||
String reason = "Not starting transform ["
|
||||
+ params.getId()
|
||||
|
@ -113,8 +116,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
|
|||
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
|
||||
}
|
||||
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState) {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, IndexNameExpressionResolver resolver) {
|
||||
String[] indices = resolver.concreteIndexNames(
|
||||
clusterState,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
|
@ -148,7 +149,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
|||
transformServices,
|
||||
mock(ThreadPool.class),
|
||||
clusterService,
|
||||
Settings.EMPTY
|
||||
Settings.EMPTY,
|
||||
new IndexNameExpressionResolver()
|
||||
);
|
||||
|
||||
assertThat(
|
||||
|
@ -234,7 +236,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
|||
transformServices,
|
||||
mock(ThreadPool.class),
|
||||
clusterService,
|
||||
Settings.EMPTY
|
||||
Settings.EMPTY,
|
||||
new IndexNameExpressionResolver()
|
||||
);
|
||||
|
||||
// old-data-node-1 prevents assignment
|
||||
|
@ -279,7 +282,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
|||
csBuilder.metaData(metaData);
|
||||
|
||||
ClusterState cs = csBuilder.build();
|
||||
assertEquals(0, TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs).size());
|
||||
assertEquals(0,
|
||||
TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, new IndexNameExpressionResolver()).size());
|
||||
|
||||
metaData = new MetaData.Builder(cs.metaData());
|
||||
routingTable = new RoutingTable.Builder(cs.routingTable());
|
||||
|
@ -303,7 +307,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
|||
|
||||
csBuilder.routingTable(routingTable.build());
|
||||
csBuilder.metaData(metaData);
|
||||
List<String> result = TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(csBuilder.build());
|
||||
List<String> result =
|
||||
TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(csBuilder.build(), new IndexNameExpressionResolver());
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(indexToRemove, result.get(0));
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -102,7 +103,8 @@ public class VotingOnlyNodePlugin extends Plugin implements DiscoveryPlugin, Net
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
this.threadPool.set(threadPool);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -253,7 +253,8 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver expressionResolver) {
|
||||
if (enabled == false) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
|
@ -76,12 +77,12 @@ public class WatcherPluginTests extends ESTestCase {
|
|||
AnalysisRegistry registry = new AnalysisRegistry(TestEnvironment.newEnvironment(settings), emptyMap(), emptyMap(), emptyMap(),
|
||||
emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap());
|
||||
IndexModule indexModule = new IndexModule(indexSettings, registry, new InternalEngineFactory(), Collections.emptyMap(),
|
||||
() -> true);
|
||||
() -> true, new IndexNameExpressionResolver());
|
||||
// this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it
|
||||
watcher.onIndexModule(indexModule);
|
||||
|
||||
// also no component creation if not enabled
|
||||
assertThat(watcher.createComponents(null, null, null, null, null, null, null, null, null), hasSize(0));
|
||||
assertThat(watcher.createComponents(null, null, null, null, null, null, null, null, null, null), hasSize(0));
|
||||
|
||||
watcher.close();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue