Merge pull request elastic/elasticsearch#2824 from rjernst/deguice5

Internal: Remove guice cyclic dependency with InternalClient

Original commit: elastic/x-pack-elasticsearch@9c24b1152d
This commit is contained in:
Ryan Ernst 2016-07-14 13:56:37 -07:00 committed by GitHub
commit b1c892b77d
15 changed files with 135 additions and 168 deletions

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.security;
import java.io.IOException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -13,60 +15,57 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import org.elasticsearch.xpack.security.authc.Authentication;
import org.elasticsearch.xpack.security.authc.InternalAuthenticationService;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.security.user.XPackUser;
/**
* A special filter client for internal node communication which adds the internal xpack user to the headers.
* An optionally secured client for internal node communication.
*
* When secured, the XPack user is added to the execution context before each action is executed.
*/
public interface InternalClient extends Client {
public class InternalClient extends FilterClient {
private final CryptoService cryptoService;
private final boolean signUserHeader;
private final String nodeName;
/**
* An insecured internal client, baseically simply delegates to the normal ES client
* without doing anything extra.
* Constructs an InternalClient.
* If {@code cryptoService} is non-null, the client is secure. Otherwise this client is a passthrough.
*/
class Insecure extends FilterClient implements InternalClient {
@Inject
public Insecure(Settings settings, ThreadPool threadPool, Client in) {
super(settings, threadPool, in);
}
public InternalClient(Settings settings, ThreadPool threadPool, Client in, CryptoService cryptoService) {
super(settings, threadPool, in);
this.cryptoService = cryptoService;
this.signUserHeader = InternalAuthenticationService.SIGN_USER_HEADER.get(settings);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
}
/**
* A secured internal client that binds the internal XPack user to the current
* execution context, before the action is executed.
*/
class Secure extends FilterClient implements InternalClient {
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
private final AuthenticationService authcService;
@Inject
public Secure(Settings settings, ThreadPool threadPool, Client in, AuthenticationService authcService) {
super(settings, threadPool, in);
this.authcService = authcService;
if (cryptoService == null) {
super.doExecute(action, request, listener);
return;
}
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().stashContext()) {
try {
authcService.attachUserIfMissing(XPackUser.INSTANCE);
} catch (IOException ioe) {
throw new ElasticsearchException("failed to attach internal user to request", ioe);
}
super.doExecute(action, request, listener);
try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().stashContext()) {
try {
Authentication authentication = new Authentication(XPackUser.INSTANCE,
new Authentication.RealmRef("__attach", "__attach", nodeName), null);
authentication.writeToContextIfMissing(threadPool().getThreadContext(), cryptoService, signUserHeader);
} catch (IOException ioe) {
throw new ElasticsearchException("failed to attach internal user to request", ioe);
}
super.doExecute(action, request, listener);
}
}
}

View File

@ -18,7 +18,6 @@ import java.util.function.Function;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.LifecycleComponent;
@ -63,7 +62,6 @@ import org.elasticsearch.xpack.security.action.user.TransportPutUserAction;
import org.elasticsearch.xpack.security.audit.AuditTrailModule;
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail;
import org.elasticsearch.xpack.security.audit.index.IndexNameResolver;
import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.xpack.security.authc.AuthenticationModule;
import org.elasticsearch.xpack.security.authc.InternalAuthenticationService;
import org.elasticsearch.xpack.security.authc.Realms;
@ -138,6 +136,10 @@ public class Security implements ActionPlugin {
return cryptoService;
}
public boolean isEnabled() {
return enabled;
}
public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>();

View File

@ -42,7 +42,7 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
@Inject
public SecurityLifecycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
IndexAuditTrail indexAuditTrail, NativeUsersStore nativeUserStore,
NativeRolesStore nativeRolesStore, Provider<InternalClient> clientProvider) {
NativeRolesStore nativeRolesStore, InternalClient client) {
super(settings);
this.settings = settings;
this.threadPool = threadPool;
@ -54,7 +54,7 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
clusterService.add(this);
clusterService.add(nativeUserStore);
clusterService.add(nativeRolesStore);
clusterService.add(new SecurityTemplateService(settings, clusterService, clientProvider, threadPool));
clusterService.add(new SecurityTemplateService(settings, clusterService, client, threadPool));
clusterService.addLifecycleListener(new LifecycleListener() {
@Override

View File

@ -40,12 +40,8 @@ public class SecurityModule extends AbstractSecurityModule {
bind(SecurityContext.Secure.class).asEagerSingleton();
bind(SecurityContext.class).to(SecurityContext.Secure.class);
bind(SecurityLifecycleService.class).asEagerSingleton();
bind(InternalClient.Secure.class).asEagerSingleton();
bind(InternalClient.class).to(InternalClient.Secure.class);
} else {
bind(SecurityContext.class).toInstance(SecurityContext.Insecure.INSTANCE);
bind(InternalClient.Insecure.class).asEagerSingleton();
bind(InternalClient.class).to(InternalClient.Insecure.class);
}
}

View File

@ -37,19 +37,18 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
public static final String SECURITY_TEMPLATE_NAME = "security-index-template";
private final ThreadPool threadPool;
private final Provider<InternalClient> clientProvider;
private final InternalClient client;
private final AtomicBoolean templateCreationPending = new AtomicBoolean(false);
public SecurityTemplateService(Settings settings, ClusterService clusterService,
Provider<InternalClient> clientProvider, ThreadPool threadPool) {
InternalClient client, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
this.clientProvider = clientProvider;
this.client = client;
clusterService.add(this);
}
private void createSecurityTemplate() {
final Client client = clientProvider.get();
try (InputStream is = getClass().getResourceAsStream("/" + SECURITY_TEMPLATE_NAME + ".json")) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -151,7 +152,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final String nodeName;
private final Provider<InternalClient> clientProvider;
private final Client client;
private final BlockingQueue<Message> eventQueue;
private final QueueConsumer queueConsumer;
private final ThreadPool threadPool;
@ -160,7 +161,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
private final boolean indexToRemoteCluster;
private BulkProcessor bulkProcessor;
private Client client;
private IndexNameResolver.Rollover rollover;
private String nodeHostName;
private String nodeHostAddress;
@ -172,10 +172,9 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
}
@Inject
public IndexAuditTrail(Settings settings, Provider<InternalClient> clientProvider, ThreadPool threadPool,
public IndexAuditTrail(Settings settings, InternalClient client, ThreadPool threadPool,
ClusterService clusterService) {
super(settings);
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.nodeName = settings.get("name");
@ -198,6 +197,13 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
}
this.indexToRemoteCluster = REMOTE_CLIENT_SETTINGS.get(settings).names().size() > 0;
if (indexToRemoteCluster == false) {
// in the absence of client settings for remote indexing, fall back to the client that was passed in.
this.client = client;
} else {
this.client = initializeRemoteClient(settings, logger);
}
}
public State state() {
@ -222,16 +228,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
*/
public synchronized boolean canStart(ClusterChangedEvent event, boolean master) {
if (indexToRemoteCluster) {
try {
if (client == null) {
initializeClient();
}
} catch (Exception e) {
logger.error("failed to initialize client for remote indexing. index based output is disabled", e);
state.set(State.FAILED);
return false;
}
ClusterStateResponse response = client.admin().cluster().prepareState().execute().actionGet();
return canStart(response.getState(), master);
}
@ -279,10 +275,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
this.nodeHostName = clusterService.localNode().getHostName();
this.nodeHostAddress = clusterService.localNode().getHostAddress();
if (client == null) {
initializeClient();
}
if (master) {
putTemplate(customAuditIndexSettings(settings));
}
@ -717,56 +709,51 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
return eventQueue.peek();
}
private void initializeClient() {
if (indexToRemoteCluster == false) {
// in the absence of client settings for remote indexing, fall back to the client that was passed in.
this.client = clientProvider.get();
} else {
Settings clientSettings = REMOTE_CLIENT_SETTINGS.get(settings);
String[] hosts = clientSettings.getAsArray("hosts");
if (hosts.length == 0) {
throw new ElasticsearchException("missing required setting " +
"[" + REMOTE_CLIENT_SETTINGS.getKey() + ".hosts] for remote audit log indexing");
}
if (clientSettings.get("cluster.name", "").isEmpty()) {
throw new ElasticsearchException("missing required setting " +
"[" + REMOTE_CLIENT_SETTINGS.getKey() + ".cluster.name] for remote audit log indexing");
}
List<Tuple<String, Integer>> hostPortPairs = new ArrayList<>();
for (String host : hosts) {
List<String> hostPort = Arrays.asList(host.trim().split(":"));
if (hostPort.size() != 1 && hostPort.size() != 2) {
logger.warn("invalid host:port specified: [{}] for setting [{}.hosts]", REMOTE_CLIENT_SETTINGS.getKey(), host);
}
hostPortPairs.add(new Tuple<>(hostPort.get(0), hostPort.size() == 2 ? Integer.valueOf(hostPort.get(1)) : 9300));
}
if (hostPortPairs.size() == 0) {
throw new ElasticsearchException("no valid host:port pairs specified for setting ["
+ REMOTE_CLIENT_SETTINGS.getKey() + ".hosts]");
}
final Settings theClientSetting = clientSettings.filter((s) -> s.startsWith("hosts") == false); // hosts is not a valid setting
final TransportClient transportClient = TransportClient.builder()
.settings(Settings.builder()
.put("node.name", DEFAULT_CLIENT_NAME + "-" + Node.NODE_NAME_SETTING.get(settings))
.put(theClientSetting))
.addPlugin(XPackPlugin.class)
.build();
for (Tuple<String, Integer> pair : hostPortPairs) {
try {
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pair.v1()), pair.v2()));
} catch (UnknownHostException e) {
throw new ElasticsearchException("could not find host {}", e, pair.v1());
}
}
this.client = transportClient;
logger.info("forwarding audit events to remote cluster [{}] using hosts [{}]",
clientSettings.get("cluster.name", ""), hostPortPairs.toString());
private static Client initializeRemoteClient(Settings settings, ESLogger logger) {
Settings clientSettings = REMOTE_CLIENT_SETTINGS.get(settings);
String[] hosts = clientSettings.getAsArray("hosts");
if (hosts.length == 0) {
throw new ElasticsearchException("missing required setting " +
"[" + REMOTE_CLIENT_SETTINGS.getKey() + ".hosts] for remote audit log indexing");
}
if (clientSettings.get("cluster.name", "").isEmpty()) {
throw new ElasticsearchException("missing required setting " +
"[" + REMOTE_CLIENT_SETTINGS.getKey() + ".cluster.name] for remote audit log indexing");
}
List<Tuple<String, Integer>> hostPortPairs = new ArrayList<>();
for (String host : hosts) {
List<String> hostPort = Arrays.asList(host.trim().split(":"));
if (hostPort.size() != 1 && hostPort.size() != 2) {
logger.warn("invalid host:port specified: [{}] for setting [{}.hosts]", REMOTE_CLIENT_SETTINGS.getKey(), host);
}
hostPortPairs.add(new Tuple<>(hostPort.get(0), hostPort.size() == 2 ? Integer.valueOf(hostPort.get(1)) : 9300));
}
if (hostPortPairs.size() == 0) {
throw new ElasticsearchException("no valid host:port pairs specified for setting ["
+ REMOTE_CLIENT_SETTINGS.getKey() + ".hosts]");
}
final Settings theClientSetting = clientSettings.filter((s) -> s.startsWith("hosts") == false); // hosts is not a valid setting
final TransportClient transportClient = TransportClient.builder()
.settings(Settings.builder()
.put("node.name", DEFAULT_CLIENT_NAME + "-" + Node.NODE_NAME_SETTING.get(settings))
.put(theClientSetting))
.addPlugin(XPackPlugin.class)
.build();
for (Tuple<String, Integer> pair : hostPortPairs) {
try {
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pair.v1()), pair.v2()));
} catch (UnknownHostException e) {
throw new ElasticsearchException("could not find host {}", e, pair.v1());
}
}
logger.info("forwarding audit events to remote cluster [{}] using hosts [{}]",
clientSettings.get("cluster.name", ""), hostPortPairs.toString());
return transportClient;
}
Settings customAuditIndexSettings(Settings nodeSettings) {

View File

@ -93,7 +93,7 @@ public class Authentication {
return authentication;
}
void writeToContextIfMissing(ThreadContext context, CryptoService cryptoService, boolean sign)
public void writeToContextIfMissing(ThreadContext context, CryptoService cryptoService, boolean sign)
throws IOException, IllegalArgumentException {
if (context.getTransient(AUTHENTICATION_KEY) != null) {
if (context.getHeader(AUTHENTICATION_KEY) == null) {

View File

@ -114,20 +114,19 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
private final Hasher hasher = Hasher.BCRYPT;
private final List<ChangeListener> listeners = new CopyOnWriteArrayList<>();
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final Provider<InternalClient> clientProvider;
private final InternalClient client;
private final ThreadPool threadPool;
private SelfReschedulingRunnable userPoller;
private Client client;
private int scrollSize;
private TimeValue scrollKeepAlive;
private volatile boolean securityIndexExists = false;
@Inject
public NativeUsersStore(Settings settings, Provider<InternalClient> clientProvider, ThreadPool threadPool) {
public NativeUsersStore(Settings settings, InternalClient client, ThreadPool threadPool) {
super(settings);
this.clientProvider = clientProvider;
this.client = client;
this.threadPool = threadPool;
}
@ -526,7 +525,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
public void start() {
try {
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
this.client = clientProvider.get();
this.scrollSize = SCROLL_SIZE_SETTING.get(settings);
this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings);
@ -703,7 +701,6 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
throw new IllegalStateException("can only reset if stopped!!!");
}
this.listeners.clear();
this.client = null;
this.securityIndexExists = false;
this.state.set(State.INITIALIZED);
}

View File

@ -105,12 +105,11 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public static final String ROLE_DOC_TYPE = "role";
private final Provider<InternalClient> clientProvider;
private final InternalClient client;
private final ThreadPool threadPool;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final ConcurrentHashMap<String, RoleAndVersion> roleCache = new ConcurrentHashMap<>();
private Client client;
private SecurityClient securityClient;
private int scrollSize;
private TimeValue scrollKeepAlive;
@ -119,9 +118,9 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
private volatile boolean securityIndexExists = false;
@Inject
public NativeRolesStore(Settings settings, Provider<InternalClient> clientProvider, ThreadPool threadPool) {
public NativeRolesStore(Settings settings, InternalClient client, ThreadPool threadPool) {
super(settings);
this.clientProvider = clientProvider;
this.client = client;
this.threadPool = threadPool;
}
@ -150,7 +149,6 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public void start() {
try {
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
this.client = clientProvider.get();
this.securityClient = new SecurityClient(client);
this.scrollSize = SCROLL_SIZE_SETTING.get(settings);
this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings);
@ -501,7 +499,6 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
throw new IllegalStateException("can only reset if stopped!!!");
}
this.roleCache.clear();
this.client = null;
this.securityIndexExists = false;
this.state.set(State.INITIALIZED);
}

View File

@ -5,6 +5,11 @@
*/
package org.elasticsearch.xpack.security.audit.index;
import java.net.InetAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -15,16 +20,12 @@ import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail.State;
@ -35,11 +36,6 @@ import org.elasticsearch.xpack.security.user.User;
import org.junit.After;
import org.junit.Before;
import java.net.InetAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -66,9 +62,9 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
threadPool = new TestThreadPool("index audit trail tests");
transportClient = TransportClient.builder().settings(Settings.builder().put("transport.type", "local")).build();
clientCalled = new AtomicBoolean(false);
class IClient extends FilterClient implements InternalClient {
class IClient extends InternalClient {
IClient(Client transportClient){
super(transportClient);
super(Settings.EMPTY, null, transportClient, null);
}
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends
@ -259,7 +255,7 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
IndexAuditTrail createAuditTrail(String[] excludes) {
Settings settings = IndexAuditTrailTests.levelSettings(null, excludes);
auditTrail = new IndexAuditTrail(settings, Providers.of(client), threadPool, clusterService) {
auditTrail = new IndexAuditTrail(settings, client, threadPool, clusterService) {
@Override
void putTemplate(Settings settings) {
// make this a no-op so we don't have to stub out unnecessary client activities

View File

@ -5,6 +5,17 @@
*/
package org.elasticsearch.xpack.security.audit.index;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -18,14 +29,11 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.SearchHit;
@ -35,9 +43,7 @@ import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInfo;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.transport.TransportRequest;
@ -58,18 +64,6 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.test.InternalTestCluster.clusterName;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.DAILY;
@ -276,7 +270,7 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
when(clusterService.localNode()).thenReturn(localNode);
threadPool = new TestThreadPool("index audit trail tests");
enqueuedMessage = new SetOnce<>();
auditor = new IndexAuditTrail(settings, Providers.of(internalClient()), threadPool, clusterService) {
auditor = new IndexAuditTrail(settings, internalClient(), threadPool, clusterService) {
@Override
void enqueue(Message message, String type) {
enqueuedMessage.set(message);

View File

@ -5,24 +5,20 @@
*/
package org.elasticsearch.xpack.security.audit.index;
import java.util.Locale;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.junit.After;
import org.junit.Before;
import java.util.Locale;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.DAILY;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.HOURLY;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.MONTHLY;
@ -53,7 +49,7 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase {
when(localNode.getHostAddress()).thenReturn(LocalTransportAddress.buildUnique().toString());
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.localNode()).thenReturn(localNode);
auditor = new IndexAuditTrail(settings, Providers.of(internalClient()), threadPool, clusterService);
auditor = new IndexAuditTrail(settings, internalClient(), threadPool, clusterService);
// before starting we add an event
auditor.authenticationFailed(new FakeRestRequest());

View File

@ -63,6 +63,7 @@ import org.elasticsearch.xpack.notification.email.Account;
import org.elasticsearch.xpack.notification.email.support.BodyPartSource;
import org.elasticsearch.xpack.rest.action.RestXPackInfoAction;
import org.elasticsearch.xpack.rest.action.RestXPackUsageAction;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.authc.AuthenticationModule;
import org.elasticsearch.xpack.support.clock.Clock;
@ -184,6 +185,9 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin {
ResourceWatcherService resourceWatcherService) {
List<Object> components = new ArrayList<>();
if (transportClientMode == false) {
final InternalClient internalClient = new InternalClient(settings, threadPool, client, security.getCryptoService());
components.add(internalClient);
// watcher http stuff
Map<String, HttpAuthFactory> httpAuthFactories = new HashMap<>();
httpAuthFactories.put(BasicAuth.TYPE, new BasicAuthFactory(security.getCryptoService()));

View File

@ -46,6 +46,6 @@ public class ClientProxy {
public static InternalClient fromClient(Client client) {
return client instanceof InternalClient ? (InternalClient) client :
new InternalClient.Insecure(client.settings(), client.threadPool(), client);
new InternalClient(client.settings(), client.threadPool(), client, null);
}
}

View File

@ -52,7 +52,7 @@ public class WatcherClientProxy extends ClientProxy {
*/
public static WatcherClientProxy of(Client client) {
return new WatcherClientProxy(Settings.EMPTY, client instanceof InternalClient ? (InternalClient) client :
new InternalClient.Insecure(client.settings(), client.threadPool(), client));
new InternalClient(client.settings(), client.threadPool(), client, null));
}
public IndexResponse index(IndexRequest request, TimeValue timeout) {