From 3b06254573fb56349c5f00c5853b0d92adb02ccb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 4 Jan 2018 00:06:16 -0500 Subject: [PATCH] Set processors on audit remote client When creating a transport client for a remote index audit trail, we are implicitly allowing the construction of this transport client to initialize the number of processors that Netty thinks are on the system. Since we never pushed down the number of processors, this will always default to the number of cores on the machine. If the user has also set the processors setting, when the server bootstraps it will try to push the number of processors down to Netty too. If this value does not match the number of cores, we will fail in bootstrap because we guard against initializing the number of processors that Netty sees to different values. Instead, the transport client should inherit the number of processors too and push this down when it pushes the number of processors down to Netty. We have to worry about another possibility: an explicit setting for the number of processors for the transport client so we require this matches the inherited value. Relates elastic/x-pack-elasticsearch#3469 Original commit: elastic/x-pack-elasticsearch@032810bb0b9a195acf08bb4f9cf6d93a1400797d --- .../security/audit/index/IndexAuditTrail.java | 19 ++++++- .../audit/index/IndexAuditTrailTests.java | 53 ++++++++++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java b/plugin/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java index 152132a7ea5..adccc907b0b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java @@ -791,6 +791,19 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail { "[" + REMOTE_CLIENT_SETTINGS.getKey() + ".hosts] for remote audit log indexing"); } + final int processors = EsExecutors.PROCESSORS_SETTING.get(settings); + if (EsExecutors.PROCESSORS_SETTING.exists(clientSettings)) { + final int clientProcessors = EsExecutors.PROCESSORS_SETTING.get(clientSettings); + if (clientProcessors != processors) { + final String message = String.format( + Locale.ROOT, + "explicit processor setting [%d] for audit trail remote client does not match inherited processor setting [%d]", + clientProcessors, + processors); + throw new IllegalStateException(message); + } + } + if (clientSettings.get("cluster.name", "").isEmpty()) { throw new ElasticsearchException("missing required setting " + "[" + REMOTE_CLIENT_SETTINGS.getKey() + ".cluster.name] for remote audit log indexing"); @@ -810,7 +823,11 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail { throw new ElasticsearchException("no valid host:port pairs specified for setting [" + REMOTE_CLIENT_SETTINGS.getKey() + ".hosts]"); } - final Settings theClientSetting = clientSettings.filter((s) -> s.startsWith("hosts") == false); // hosts is not a valid setting + final Settings theClientSetting = + Settings.builder() + .put(clientSettings.filter((s) -> s.startsWith("hosts") == false)) // hosts is not a valid setting + .put(EsExecutors.PROCESSORS_SETTING.getKey(), processors) + .build(); final TransportClient transportClient = new TransportClient(Settings.builder() .put("node.name", DEFAULT_CLIENT_NAME + "-" + Node.NODE_NAME_SETTING.get(settings)) .put(theClientSetting).build(), Settings.EMPTY, remoteTransportClientPlugins(), null) {}; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java index f84713063e4..b178d9944f6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -26,6 +27,7 @@ import org.elasticsearch.common.settings.KeyStoreWrapper; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.search.SearchHit; @@ -73,7 +75,9 @@ import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rol import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.MONTHLY; import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.WEEKLY; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -113,6 +117,7 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase { if (remoteCluster != null) { remoteCluster.close(); remoteCluster = null; + } remoteSettings = null; } @@ -299,13 +304,17 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase { } private void initialize(String[] includes, String[] excludes) throws Exception { + initialize(includes, excludes, Settings.EMPTY); + } + + private void initialize(final String[] includes, final String[] excludes, final Settings additionalSettings) throws Exception { rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY); includeRequestBody = randomBoolean(); Settings.Builder builder = Settings.builder(); if (remoteIndexing) { builder.put(remoteSettings); } - builder.put(settings(rollover, includes, excludes)); + builder.put(settings(rollover, includes, excludes)).put(additionalSettings).build(); // IndexAuditTrail should ignore secure settings // they are merged on the master node creating the audit index if (randomBoolean()) { @@ -345,6 +354,48 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase { auditor.start(); } + public void testProcessorsSetting() { + final boolean explicitProcessors = randomBoolean(); + final int processors; + if (explicitProcessors) { + processors = randomIntBetween(1, 16); + } else { + processors = EsExecutors.PROCESSORS_SETTING.get(Settings.EMPTY); + } + final boolean explicitClientProcessors = randomBoolean(); + final int clientProcessors; + if (explicitClientProcessors) { + clientProcessors = randomIntBetween(1, 16); + } else { + clientProcessors = EsExecutors.PROCESSORS_SETTING.get(Settings.EMPTY); + } + + final Settings.Builder additionalSettingsBuilder = + Settings.builder() + .put("xpack.security.audit.index.client.cluster.name", "remote") + .put("xpack.security.audit.index.client.hosts", "localhost:9300"); + + if (explicitProcessors) { + additionalSettingsBuilder.put(EsExecutors.PROCESSORS_SETTING.getKey(), processors); + } + if (explicitClientProcessors) { + additionalSettingsBuilder.put("xpack.security.audit.index.client.processors", clientProcessors); + } + + final ThrowingRunnable runnable = () -> initialize(null, null, additionalSettingsBuilder.build()); + if (processors == clientProcessors || explicitClientProcessors == false) { + // okay, the client initialized which is all we care about but no nodes are available because we never set up the remote cluster + expectThrows(NoNodeAvailableException.class, runnable); + } else { + final IllegalStateException e = expectThrows(IllegalStateException.class, runnable); + assertThat( + e, + hasToString(containsString( + "explicit processor setting [" + clientProcessors + "]" + + " for audit trail remote client does not match inherited processor setting [" + processors + "]"))); + } + } + public void testAnonymousAccessDeniedTransport() throws Exception { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());