Set processors on audit remote client

When creating a transport client for a remote index audit trail, we are
implicitly allowing the construction of this transport client to
initialize the number of processors that Netty thinks are on the
system. Since we never pushed down the number of processors, this will
always default to the number of cores on the machine. If the user has
also set the processors setting, when the server bootstraps it will try
to push the number of processors down to Netty too. If this value does
not match the number of cores, we will fail in bootstrap because we
guard against initializing the number of processors that Netty sees to
different values. Instead, the transport client should inherit the
number of processors too and push this down when it pushes the number of
processors down to Netty. We have to worry about another possibility: an
explicit setting for the number of processors for the transport client
so we require this matches the inherited value.

Relates elastic/x-pack-elasticsearch#3469


Original commit: elastic/x-pack-elasticsearch@032810bb0b
This commit is contained in:
Jason Tedor 2018-01-04 00:06:16 -05:00 committed by GitHub
parent b05ee6fcc9
commit 3b06254573
2 changed files with 70 additions and 2 deletions

View File

@ -791,6 +791,19 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
"[" + REMOTE_CLIENT_SETTINGS.getKey() + ".hosts] for remote audit log indexing");
}
final int processors = EsExecutors.PROCESSORS_SETTING.get(settings);
if (EsExecutors.PROCESSORS_SETTING.exists(clientSettings)) {
final int clientProcessors = EsExecutors.PROCESSORS_SETTING.get(clientSettings);
if (clientProcessors != processors) {
final String message = String.format(
Locale.ROOT,
"explicit processor setting [%d] for audit trail remote client does not match inherited processor setting [%d]",
clientProcessors,
processors);
throw new IllegalStateException(message);
}
}
if (clientSettings.get("cluster.name", "").isEmpty()) {
throw new ElasticsearchException("missing required setting " +
"[" + REMOTE_CLIENT_SETTINGS.getKey() + ".cluster.name] for remote audit log indexing");
@ -810,7 +823,11 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
throw new ElasticsearchException("no valid host:port pairs specified for setting ["
+ REMOTE_CLIENT_SETTINGS.getKey() + ".hosts]");
}
final Settings theClientSetting = clientSettings.filter((s) -> s.startsWith("hosts") == false); // hosts is not a valid setting
final Settings theClientSetting =
Settings.builder()
.put(clientSettings.filter((s) -> s.startsWith("hosts") == false)) // hosts is not a valid setting
.put(EsExecutors.PROCESSORS_SETTING.getKey(), processors)
.build();
final TransportClient transportClient = new TransportClient(Settings.builder()
.put("node.name", DEFAULT_CLIENT_NAME + "-" + Node.NODE_NAME_SETTING.get(settings))
.put(theClientSetting).build(), Settings.EMPTY, remoteTransportClientPlugins(), null) {};

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -26,6 +27,7 @@ import org.elasticsearch.common.settings.KeyStoreWrapper;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.SearchHit;
@ -73,7 +75,9 @@ import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rol
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.MONTHLY;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.WEEKLY;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -113,6 +117,7 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
if (remoteCluster != null) {
remoteCluster.close();
remoteCluster = null;
}
remoteSettings = null;
}
@ -299,13 +304,17 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
}
private void initialize(String[] includes, String[] excludes) throws Exception {
initialize(includes, excludes, Settings.EMPTY);
}
private void initialize(final String[] includes, final String[] excludes, final Settings additionalSettings) throws Exception {
rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY);
includeRequestBody = randomBoolean();
Settings.Builder builder = Settings.builder();
if (remoteIndexing) {
builder.put(remoteSettings);
}
builder.put(settings(rollover, includes, excludes));
builder.put(settings(rollover, includes, excludes)).put(additionalSettings).build();
// IndexAuditTrail should ignore secure settings
// they are merged on the master node creating the audit index
if (randomBoolean()) {
@ -345,6 +354,48 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
auditor.start();
}
public void testProcessorsSetting() {
final boolean explicitProcessors = randomBoolean();
final int processors;
if (explicitProcessors) {
processors = randomIntBetween(1, 16);
} else {
processors = EsExecutors.PROCESSORS_SETTING.get(Settings.EMPTY);
}
final boolean explicitClientProcessors = randomBoolean();
final int clientProcessors;
if (explicitClientProcessors) {
clientProcessors = randomIntBetween(1, 16);
} else {
clientProcessors = EsExecutors.PROCESSORS_SETTING.get(Settings.EMPTY);
}
final Settings.Builder additionalSettingsBuilder =
Settings.builder()
.put("xpack.security.audit.index.client.cluster.name", "remote")
.put("xpack.security.audit.index.client.hosts", "localhost:9300");
if (explicitProcessors) {
additionalSettingsBuilder.put(EsExecutors.PROCESSORS_SETTING.getKey(), processors);
}
if (explicitClientProcessors) {
additionalSettingsBuilder.put("xpack.security.audit.index.client.processors", clientProcessors);
}
final ThrowingRunnable runnable = () -> initialize(null, null, additionalSettingsBuilder.build());
if (processors == clientProcessors || explicitClientProcessors == false) {
// okay, the client initialized which is all we care about but no nodes are available because we never set up the remote cluster
expectThrows(NoNodeAvailableException.class, runnable);
} else {
final IllegalStateException e = expectThrows(IllegalStateException.class, runnable);
assertThat(
e,
hasToString(containsString(
"explicit processor setting [" + clientProcessors + "]" +
" for audit trail remote client does not match inherited processor setting [" + processors + "]")));
}
}
public void testAnonymousAccessDeniedTransport() throws Exception {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());