Remove LazyInitializable and friends (elastic/elasticsearch#2558)

This class should have never existed, I fixed all places where we messed aroudn with this
and resolved dependencies or let guice deal with it.

Original commit: elastic/x-pack-elasticsearch@6a42c4153d
This commit is contained in:
Simon Willnauer 2016-06-20 17:25:30 +02:00 committed by GitHub
parent c024dbfc49
commit a54d3bc3d5
32 changed files with 121 additions and 275 deletions

View File

@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionModule;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.marvel.action.MonitoringBulkAction;
@ -20,9 +19,7 @@ import org.elasticsearch.marvel.agent.exporter.ExporterModule;
import org.elasticsearch.marvel.cleaner.CleanerService;
import org.elasticsearch.marvel.client.MonitoringClientModule;
import org.elasticsearch.marvel.rest.action.RestMonitoringBulkAction;
import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.common.init.LazyInitializationModule;
import java.util.ArrayList;
import java.util.Arrays;
@ -80,9 +77,6 @@ public class Monitoring {
CleanerService.class);
}
public void onModule(SettingsModule module) {
}
public void onModule(ActionModule module) {
if (enabled && tribeNode == false) {
module.registerAction(MonitoringBulkAction.INSTANCE, TransportMonitoringBulkAction.class);
@ -95,12 +89,6 @@ public class Monitoring {
}
}
public void onModule(LazyInitializationModule module) {
if (enabled && tribeNode == false) {
module.registerLazyInitializable(MonitoringClientProxy.class);
}
}
public static boolean enabled(Settings settings) {
return MonitoringSettings.ENABLED.get(settings);
}

View File

@ -16,7 +16,7 @@ import org.elasticsearch.marvel.agent.exporter.ExportException;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.marvel.agent.resolver.ResolversRegistry;
import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import java.util.Arrays;
import java.util.Collection;
@ -28,13 +28,13 @@ import java.util.Collection;
public class LocalBulk extends ExportBulk {
private final ESLogger logger;
private final MonitoringClientProxy client;
private final ClientProxy client;
private final ResolversRegistry resolvers;
private BulkRequestBuilder requestBuilder;
public LocalBulk(String name, ESLogger logger, MonitoringClientProxy client, ResolversRegistry resolvers) {
public LocalBulk(String name, ESLogger logger, ClientProxy client, ResolversRegistry resolvers) {
super(name);
this.logger = logger;
this.client = client;

View File

@ -30,7 +30,8 @@ import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver;
import org.elasticsearch.marvel.agent.resolver.ResolversRegistry;
import org.elasticsearch.marvel.cleaner.CleanerService;
import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -50,14 +51,14 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
public static final String TYPE = "local";
private final MonitoringClientProxy client;
private final ClientProxy client;
private final ClusterService clusterService;
private final ResolversRegistry resolvers;
private final CleanerService cleanerService;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
public LocalExporter(Exporter.Config config, MonitoringClientProxy client,
public LocalExporter(Exporter.Config config, ClientProxy client,
ClusterService clusterService, CleanerService cleanerService) {
super(TYPE, config);
this.client = client;
@ -303,14 +304,14 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
public static class Factory extends Exporter.Factory<LocalExporter> {
private final MonitoringClientProxy client;
private final ClientProxy client;
private final ClusterService clusterService;
private final CleanerService cleanerService;
@Inject
public Factory(MonitoringClientProxy client, ClusterService clusterService, CleanerService cleanerService) {
public Factory(InternalClient client, ClusterService clusterService, CleanerService cleanerService) {
super(TYPE, true);
this.client = client;
this.client = new ClientProxy(client);
this.clusterService = clusterService;
this.cleanerService = cleanerService;
}

View File

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.support.init.proxy;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
public class MonitoringClientProxy extends ClientProxy {
/**
* Creates a proxy to the given internal client (can be used for testing)
*/
public static MonitoringClientProxy of(Client client) {
MonitoringClientProxy proxy = new MonitoringClientProxy();
proxy.client = client instanceof InternalClient ? (InternalClient) client : new InternalClient.Insecure(client);
return proxy;
}
}

View File

@ -19,8 +19,8 @@ import org.elasticsearch.marvel.MonitoringSettings;
import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.cleaner.CleanerService;
import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.junit.Before;
import java.util.ArrayList;
@ -69,7 +69,7 @@ public class ExportersTests extends ESTestCase {
clusterService = mock(ClusterService.class);
// we always need to have the local exporter as it serves as the default one
factories.put(LocalExporter.TYPE, new LocalExporter.Factory(MonitoringClientProxy.of(client), clusterService,
factories.put(LocalExporter.TYPE, new LocalExporter.Factory(ClientProxy.fromClient(client), clusterService,
mock(CleanerService.class)));
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(MonitoringSettings.COLLECTORS,
MonitoringSettings.INTERVAL, MonitoringSettings.EXPORTERS_SETTINGS)));

View File

@ -14,30 +14,29 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
/**
*
*/
public abstract class InternalClient extends FilterClient {
public interface InternalClient extends Client {
protected InternalClient(Client in) {
super(in);
}
/**
* An insecured internal client, baseically simply delegates to the normal ES client
* without doing anything extra.
*/
public static class Insecure extends InternalClient {
class Insecure extends FilterClient implements InternalClient {
@Inject
public Insecure(Client in) {
super(in);
public Insecure(Settings settings, ThreadPool threadPool, Client in) {
super(settings, threadPool, in);
}
}
@ -45,13 +44,13 @@ public abstract class InternalClient extends FilterClient {
* A secured internal client that binds the internal XPack user to the current
* execution context, before the action is executed.
*/
public static class Secure extends InternalClient {
class Secure extends FilterClient implements InternalClient {
private AuthenticationService authcService;
private final AuthenticationService authcService;
@Inject
public Secure(Client in, AuthenticationService authcService) {
super(in);
public Secure(Settings settings, ThreadPool threadPool, Client in, AuthenticationService authcService) {
super(settings, threadPool, in);
this.authcService = authcService;
}

View File

@ -10,6 +10,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.util.Providers;
@ -62,14 +64,18 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
threadPool = new TestThreadPool("index audit trail tests");
transportClient = TransportClient.builder().settings(Settings.EMPTY).build();
clientCalled = new AtomicBoolean(false);
client = new InternalClient(transportClient) {
class IClient extends FilterClient implements InternalClient {
IClient(Client transportClient){
super(transportClient);
}
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
clientCalled.set(true);
}
};
}
client = new IClient(transportClient);
messageEnqueued = new AtomicBoolean(false);
}

View File

@ -16,7 +16,6 @@ import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.license.plugin.Licensing;
@ -25,7 +24,6 @@ import org.elasticsearch.marvel.MonitoringSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.authc.AuthenticationModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
@ -35,8 +33,6 @@ import org.elasticsearch.xpack.action.XPackInfoAction;
import org.elasticsearch.xpack.action.XPackUsageAction;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.common.http.HttpClientModule;
import org.elasticsearch.xpack.common.init.LazyInitializationModule;
import org.elasticsearch.xpack.common.init.LazyInitializationService;
import org.elasticsearch.xpack.common.secret.SecretModule;
import org.elasticsearch.xpack.extensions.XPackExtension;
import org.elasticsearch.xpack.extensions.XPackExtensionsService;
@ -135,7 +131,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin {
@Override
public Collection<Module> nodeModules() {
ArrayList<Module> modules = new ArrayList<>();
modules.add(new LazyInitializationModule());
modules.add(new ClockModule());
modules.addAll(notification.nodeModules());
modules.addAll(licensing.nodeModules());
@ -155,10 +150,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin {
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
ArrayList<Class<? extends LifecycleComponent>> services = new ArrayList<>();
// the initialization service must be first in the list
// as other services may depend on one of the initialized
// constructs
services.add(LazyInitializationService.class);
services.addAll(notification.nodeServices());
services.addAll(licensing.nodeServices());
services.addAll(security.nodeServices());
@ -252,11 +243,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin {
graph.onIndexModule(module);
}
public void onModule(LazyInitializationModule module) {
monitoring.onModule(module);
watcher.onModule(module);
}
public static void bindFeatureSet(Binder binder, Class<? extends XPackFeatureSet> featureSet) {
binder.bind(featureSet).asEagerSingleton();
Multibinder<XPackFeatureSet> featureSetBinder = Multibinder.newSetBinder(binder, XPackFeatureSet.class);

View File

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common.init;
import org.elasticsearch.common.inject.Injector;
public interface LazyInitializable {
/**
* This method is called once all objects have been constructed and
* the @{link LazyInitializationService} has been started.
*/
void init(Injector injector);
}

View File

@ -1,39 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common.init;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import java.util.HashSet;
import java.util.Set;
/**
* A module to lazy initialize objects and avoid circular dependency injection issues.
*
* Objects that use the {@link org.elasticsearch.client.ElasticsearchClient} and that are also injected in transport actions provoke
* a circular dependency injection issues with Guice. Using proxies with lazy initialization is a way to solve this issue.
*
* The proxies are initialized by {@link LazyInitializationService}.
*/
public class LazyInitializationModule extends AbstractModule {
private final Set<Class<? extends LazyInitializable>> initializables = new HashSet<>();
@Override
protected void configure() {
Multibinder<LazyInitializable> mbinder = Multibinder.newSetBinder(binder(), LazyInitializable.class);
for (Class<? extends LazyInitializable> initializable : initializables) {
bind(initializable).asEagerSingleton();
mbinder.addBinding().to(initializable);
}
bind(LazyInitializationService.class).asEagerSingleton();
}
public void registerLazyInitializable(Class<? extends LazyInitializable> lazyTypeClass) {
initializables.add(lazyTypeClass);
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common.init;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
import java.util.Set;
/**
* A service to lazy initialize {@link LazyInitializable} constructs.
*/
public class LazyInitializationService extends AbstractLifecycleComponent {
private final Injector injector;
private final Set<LazyInitializable> initializables;
@Inject
public LazyInitializationService(Settings settings, Injector injector, Set<LazyInitializable> initializables) {
super(settings);
this.injector = injector;
this.initializables = initializables;
}
@Override
protected void doStart() throws ElasticsearchException {
for (LazyInitializable initializable : initializables) {
logger.trace("lazy initialization of [{}]", initializable);
initializable.init(injector);
}
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -11,22 +11,21 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.xpack.common.init.LazyInitializable;
/**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
* needs to injected to be avoid circular dependencies issues.
*/
public class ClientProxy implements LazyInitializable {
public class ClientProxy {
protected InternalClient client;
protected final InternalClient client;
@Override
public void init(Injector injector) {
this.client = injector.getInstance(InternalClient.class);
public ClientProxy(InternalClient client) {
this.client = client;
}
public AdminClient admin() {
@ -44,4 +43,9 @@ public class ClientProxy implements LazyInitializable {
protected <M extends TransportMessage> M preProcess(M message) {
return message;
}
public static InternalClient fromClient(Client client) {
return client instanceof InternalClient ? (InternalClient) client :
new InternalClient.Insecure(client.settings(), client.threadPool(), client);
}
}

View File

@ -20,11 +20,9 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.common.init.LazyInitializationModule;
import org.elasticsearch.xpack.watcher.actions.WatcherActionModule;
import org.elasticsearch.xpack.watcher.client.WatcherClientModule;
import org.elasticsearch.xpack.watcher.condition.ConditionModule;
@ -43,13 +41,10 @@ import org.elasticsearch.xpack.watcher.rest.action.RestHijackOperationAction;
import org.elasticsearch.xpack.watcher.rest.action.RestPutWatchAction;
import org.elasticsearch.xpack.watcher.rest.action.RestWatchServiceAction;
import org.elasticsearch.xpack.watcher.rest.action.RestWatcherStatsAction;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TemplateConfig;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.validation.WatcherSettingsValidation;
import org.elasticsearch.xpack.watcher.transform.TransformModule;
import org.elasticsearch.xpack.watcher.transform.chain.ChainTransformFactory;
import org.elasticsearch.xpack.watcher.transport.actions.ack.AckWatchAction;
import org.elasticsearch.xpack.watcher.transport.actions.ack.TransportAckWatchAction;
import org.elasticsearch.xpack.watcher.transport.actions.activate.ActivateWatchAction;
@ -207,14 +202,6 @@ public class Watcher {
}
}
public void onModule(LazyInitializationModule module) {
if (enabled) {
module.registerLazyInitializable(WatcherClientProxy.class);
module.registerLazyInitializable(ChainTransformFactory.class);
module.registerLazyInitializable(ChainInputFactory.class);
}
}
public static boolean enabled(Settings settings) {
return XPackPlugin.featureEnabled(settings, NAME, true);
}

View File

@ -10,8 +10,8 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.actions.ActionFactory;
import org.elasticsearch.xpack.watcher.actions.email.ExecutableEmailAction;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import java.io.IOException;
@ -25,8 +25,12 @@ public class IndexActionFactory extends ActionFactory<IndexAction, ExecutableInd
private final TimeValue defaultTimeout;
@Inject
public IndexActionFactory(Settings settings, WatcherClientProxy client) {
super(Loggers.getLogger(ExecutableEmailAction.class, settings));
public IndexActionFactory(Settings settings, InternalClient client) {
this(settings, new WatcherClientProxy(settings, client));
}
public IndexActionFactory(Settings settings, WatcherClientProxy client ) {
super(Loggers.getLogger(IndexActionFactory.class, settings));
this.client = client;
this.defaultTimeout = settings.getAsTime("xpack.watcher.actions.index.default_timeout", null);
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import java.io.IOException;
@ -57,6 +58,10 @@ public class TriggeredWatchStore extends AbstractComponent {
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public TriggeredWatchStore(Settings settings, InternalClient client, TriggeredWatch.Parser triggeredWatchParser) {
this(settings, new WatcherClientProxy(settings, client), triggeredWatchParser);
}
public TriggeredWatchStore(Settings settings, WatcherClientProxy client, TriggeredWatch.Parser triggeredWatchParser) {
super(settings);
this.scrollSize = settings.getAsInt("xpack.watcher.execution.scroll.size", 100);

View File

@ -12,7 +12,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.watcher.WatcherModule;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
@ -45,6 +45,10 @@ public class HistoryStore extends AbstractComponent {
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public HistoryStore(Settings settings, InternalClient client) {
this(settings, new WatcherClientProxy(settings, client));
}
public HistoryStore(Settings settings, WatcherClientProxy client) {
super(settings);
this.client = client;

View File

@ -48,9 +48,6 @@ public class InputModule extends AbstractModule {
bind(NoneInputFactory.class).asEagerSingleton();
parsersBinder.addBinding(NoneInput.TYPE).to(NoneInputFactory.class);
// no bind() needed, done using the LazyInitializationModule
parsersBinder.addBinding(ChainInput.TYPE).to(ChainInputFactory.class);
for (Map.Entry<String, Class<? extends InputFactory>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
parsersBinder.addBinding(entry.getKey()).to(entry.getValue());

View File

@ -7,9 +7,14 @@ package org.elasticsearch.xpack.watcher.input;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.input.chain.ChainInput;
import org.elasticsearch.xpack.watcher.input.chain.ChainInputFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class InputRegistry {
@ -17,8 +22,10 @@ public class InputRegistry {
private final Map<String, InputFactory> factories;
@Inject
public InputRegistry(Map<String, InputFactory> factories) {
this.factories = factories;
public InputRegistry(Settings settings, Map<String, InputFactory> factories) {
Map<String, InputFactory> map = new HashMap<>(factories);
map.put(ChainInput.TYPE, new ChainInputFactory(settings, this));
this.factories = Collections.unmodifiableMap(map);
}
/**

View File

@ -15,20 +15,19 @@ import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.input.InputFactory;
import org.elasticsearch.xpack.watcher.input.InputRegistry;
import org.elasticsearch.xpack.common.init.LazyInitializable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ChainInputFactory extends InputFactory<ChainInput, ChainInput.Result, ExecutableChainInput>
implements LazyInitializable {
public class ChainInputFactory extends InputFactory<ChainInput, ChainInput.Result, ExecutableChainInput> {
private InputRegistry inputRegistry;
private final InputRegistry inputRegistry;
@Inject
public ChainInputFactory(Settings settings) {
public ChainInputFactory(Settings settings, InputRegistry inputRegistry) {
super(Loggers.getLogger(ExecutableChainInput.class, settings));
this.inputRegistry = inputRegistry;
}
@Override
@ -51,13 +50,4 @@ public class ChainInputFactory extends InputFactory<ChainInput, ChainInput.Resul
return new ExecutableChainInput(input, executableInputs, inputLogger);
}
@Override
public void init(Injector injector) {
init(injector.getInstance(InputRegistry.class));
}
void init(InputRegistry inputRegistry) {
this.inputRegistry = inputRegistry;
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.suggest.Suggesters;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.input.InputFactory;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
@ -34,6 +35,11 @@ public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Re
private final ParseFieldMatcher parseFieldMatcher;
@Inject
public SearchInputFactory(Settings settings, InternalClient client, IndicesQueriesRegistry queryRegistry,
AggregatorParsers aggParsers, Suggesters suggesters) {
this(settings, new WatcherClientProxy(settings, client), queryRegistry, aggParsers, suggesters);
}
public SearchInputFactory(Settings settings, WatcherClientProxy client, IndicesQueriesRegistry queryRegistry,
AggregatorParsers aggParsers, Suggesters suggesters) {
super(Loggers.getLogger(ExecutableSimpleInput.class, settings));

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.template.TemplateUtils;
@ -62,9 +63,9 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
@Inject
public WatcherIndexTemplateRegistry(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService,
ThreadPool threadPool, WatcherClientProxy client) {
ThreadPool threadPool, InternalClient client) {
super(settings);
this.client = client;
this.client = new WatcherClientProxy(settings, client);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexTemplates = TEMPLATE_CONFIGS;

View File

@ -40,8 +40,8 @@ public class WatcherClientProxy extends ClientProxy {
private final TimeValue defaultIndexTimeout;
private final TimeValue defaultBulkTimeout;
@Inject
public WatcherClientProxy(Settings settings) {
public WatcherClientProxy(Settings settings, InternalClient client) {
super(client);
defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
defaultIndexTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(60));
defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120));
@ -51,9 +51,8 @@ public class WatcherClientProxy extends ClientProxy {
* Creates a proxy to the given internal client (can be used for testing)
*/
public static WatcherClientProxy of(Client client) {
WatcherClientProxy proxy = new WatcherClientProxy(Settings.EMPTY);
proxy.client = client instanceof InternalClient ? (InternalClient) client : new InternalClient.Insecure(client);
return proxy;
return new WatcherClientProxy(Settings.EMPTY, client instanceof InternalClient ? (InternalClient) client :
new InternalClient.Insecure(client.settings(), client.threadPool(), client));
}
public IndexResponse index(IndexRequest request, TimeValue timeout) {

View File

@ -38,9 +38,6 @@ public class TransformModule extends AbstractModule {
bind(ScriptTransformFactory.class).asEagerSingleton();
mbinder.addBinding(ScriptTransform.TYPE).to(ScriptTransformFactory.class);
// no bind() needed, done using the LazyInitializationModule
mbinder.addBinding(ChainTransform.TYPE).to(ChainTransformFactory.class);
for (Map.Entry<String, Class<? extends TransformFactory>> entry : factories.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
mbinder.addBinding(entry.getKey()).to(entry.getValue());

View File

@ -7,9 +7,14 @@ package org.elasticsearch.xpack.watcher.transform;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.transform.chain.ChainTransform;
import org.elasticsearch.xpack.watcher.transform.chain.ChainTransformFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
@ -20,8 +25,10 @@ public class TransformRegistry {
private final Map<String, TransformFactory> factories;
@Inject
public TransformRegistry(Map<String, TransformFactory> factories) {
this.factories = factories;
public TransformRegistry(Settings settings, Map<String, TransformFactory> factories) {
Map<String, TransformFactory> map = new HashMap<>(factories);
map.put(ChainTransform.TYPE, new ChainTransformFactory(settings, this));
this.factories = Collections.unmodifiableMap(map);
}
public TransformFactory factory(String type) {

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.transform.chain;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
@ -13,41 +12,16 @@ import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
import org.elasticsearch.xpack.common.init.LazyInitializable;
import java.io.IOException;
import java.util.ArrayList;
/**
*
*/
public class ChainTransformFactory extends TransformFactory<ChainTransform, ChainTransform.Result, ExecutableChainTransform> implements
LazyInitializable {
public final class ChainTransformFactory extends TransformFactory<ChainTransform, ChainTransform.Result, ExecutableChainTransform> {
private TransformRegistry registry;
private final TransformRegistry registry;
// used by guice
public ChainTransformFactory(Settings settings) {
public ChainTransformFactory(Settings settings, TransformRegistry registry) {
super(Loggers.getLogger(ExecutableChainTransform.class, settings));
}
// used for tests
public ChainTransformFactory(TransformRegistry registry) {
super(Loggers.getLogger(ExecutableChainTransform.class));
this.registry = registry;
}
// used for tests
public ChainTransformFactory() {
super(Loggers.getLogger(ExecutableChainTransform.class));
}
@Override
public void init(Injector injector) {
init(injector.getInstance(TransformRegistry.class));
}
public void init(TransformRegistry registry) {
this.registry = registry;
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.suggest.Suggesters;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
@ -33,6 +34,10 @@ public class SearchTransformFactory extends TransformFactory<SearchTransform, Se
private final ParseFieldMatcher parseFieldMatcher;
@Inject
public SearchTransformFactory(Settings settings, InternalClient client, IndicesQueriesRegistry queryRegistry,
AggregatorParsers aggParsers, Suggesters suggesters) {
this(settings, new WatcherClientProxy(settings, client), queryRegistry, aggParsers, suggesters);
}
public SearchTransformFactory(Settings settings, WatcherClientProxy client, IndicesQueriesRegistry queryRegistry,
AggregatorParsers aggParsers, Suggesters suggesters) {
super(Loggers.getLogger(ExecutableSearchTransform.class, settings));

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import java.io.IOException;
@ -59,6 +60,10 @@ public class WatchStore extends AbstractComponent {
private final TimeValue scrollTimeout;
@Inject
public WatchStore(Settings settings, InternalClient client, Watch.Parser watchParser) {
this(settings, new WatcherClientProxy(settings, client), watchParser);
}
public WatchStore(Settings settings, WatcherClientProxy client, Watch.Parser watchParser) {
super(settings);
this.client = client;

View File

@ -16,6 +16,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
@ -199,7 +200,7 @@ public class IndexActionTests extends ESIntegTestCase {
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, WatcherClientProxy.of(client()));
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, ClientProxy.fromClient(client()));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
@ -227,7 +228,7 @@ public class IndexActionTests extends ESIntegTestCase {
}
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, WatcherClientProxy.of(client()));
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY,ClientProxy.fromClient(client()));
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
try {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.watcher.input;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
@ -20,7 +21,7 @@ import static org.hamcrest.Matchers.containsString;
public class InputRegistryTests extends ESTestCase {
public void testParseEmptyInput() throws Exception {
InputRegistry registry = new InputRegistry(emptyMap());
InputRegistry registry = new InputRegistry(Settings.EMPTY, emptyMap());
XContentParser parser = JsonXContent.jsonXContent.createParser(
jsonBuilder().startObject().endObject().bytes());
parser.nextToken();
@ -33,7 +34,7 @@ public class InputRegistryTests extends ESTestCase {
}
public void testParseArrayInput() throws Exception {
InputRegistry registry = new InputRegistry(emptyMap());
InputRegistry registry = new InputRegistry(Settings.EMPTY, emptyMap());
XContentParser parser = JsonXContent.jsonXContent.createParser(
jsonBuilder().startArray().endArray().bytes());
parser.nextToken();

View File

@ -70,9 +70,8 @@ public class ChainInputTests extends ESTestCase {
factories.put("simple", new SimpleInputFactory(Settings.EMPTY));
// hackedy hack...
InputRegistry inputRegistry = new InputRegistry(factories);
ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY);
chainInputFactory.init(inputRegistry);
InputRegistry inputRegistry = new InputRegistry(Settings.EMPTY, factories);
ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY, inputRegistry);
factories.put("chain", chainInputFactory);
XContentBuilder builder = jsonBuilder().startObject().startArray("inputs")
@ -123,9 +122,8 @@ public class ChainInputTests extends ESTestCase {
Map<String, InputFactory> factories = new HashMap<>();
factories.put("simple", new SimpleInputFactory(Settings.EMPTY));
InputRegistry inputRegistry = new InputRegistry(factories);
ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY);
chainInputFactory.init(inputRegistry);
InputRegistry inputRegistry = new InputRegistry(Settings.EMPTY, factories);
ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY, inputRegistry);
factories.put("chain", chainInputFactory);
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.watcher.transform.chain;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -111,9 +112,9 @@ public class ChainTransformTests extends ESTestCase {
}
public void testParser() throws Exception {
TransformRegistry registry = new TransformRegistry(singletonMap("named", new NamedExecutableTransform.Factory(logger)));
TransformRegistry registry = new TransformRegistry(Settings.EMPTY, singletonMap("named", new NamedExecutableTransform.Factory(logger)));
ChainTransformFactory transformParser = new ChainTransformFactory(registry);
ChainTransformFactory transformParser = new ChainTransformFactory(Settings.EMPTY, registry);
XContentBuilder builder = jsonBuilder().startArray()
.startObject().startObject("named").field("name", "name1").endObject().endObject()

View File

@ -356,10 +356,10 @@ public class WatchTests extends ESTestCase {
QueryParser<MatchAllQueryBuilder> queryParser = MatchAllQueryBuilder::fromXContent;
queryRegistry.register(queryParser, MatchAllQueryBuilder.QUERY_NAME_FIELD);
parsers.put(SearchInput.TYPE, new SearchInputFactory(settings, client, queryRegistry, null, null));
return new InputRegistry(parsers);
return new InputRegistry(Settings.EMPTY, parsers);
default:
parsers.put(SimpleInput.TYPE, new SimpleInputFactory(settings));
return new InputRegistry(parsers);
return new InputRegistry(Settings.EMPTY, parsers);
}
}
@ -424,12 +424,9 @@ public class WatchTests extends ESTestCase {
QueryParser<MatchAllQueryBuilder> queryParser = MatchAllQueryBuilder::fromXContent;
queryRegistry.register(queryParser, MatchAllQueryBuilder.QUERY_NAME_FIELD);
Map<String, TransformFactory> factories = new HashMap<>();
ChainTransformFactory parser = new ChainTransformFactory();
factories.put(ChainTransform.TYPE, parser);
factories.put(ScriptTransform.TYPE, new ScriptTransformFactory(settings, scriptService));
factories.put(SearchTransform.TYPE, new SearchTransformFactory(settings, client, queryRegistry, null, null));
TransformRegistry registry = new TransformRegistry(unmodifiableMap(factories));
parser.init(registry);
TransformRegistry registry = new TransformRegistry(Settings.EMPTY, unmodifiableMap(factories));
return registry;
}