Security: fix joining cluster with production license (#31341)

The changes made to disable security for trial licenses unless security
is explicitly enabled caused issues when a 6.3 node attempts to join a
cluster that already has a production license installed. The new node
starts off with a trial license and `xpack.security.enabled` is not
set for the node, which causes the security code to skip attaching the
user to the request. The existing cluster has security enabled and the
lack of a user attached to the requests causes the request to be
rejected.

This commit changes the security code to check if the state has been
recovered yet when making the decision on whether or not to attach a
user. If the state has not yet been recovered, the code will attach
the user to the request in case security is enabled on the cluster
being joined.

Closes #31332
This commit is contained in:
Jay Modi 2018-06-19 11:58:34 -06:00 committed by GitHub
parent 529e704b11
commit dc57eece75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 156 additions and 21 deletions

View File

@ -254,7 +254,11 @@ public class XPackLicenseState {
public XPackLicenseState(Settings settings) {
this.isSecurityEnabled = XPackSettings.SECURITY_ENABLED.get(settings);
this.isSecurityExplicitlyEnabled = settings.hasValue(XPackSettings.SECURITY_ENABLED.getKey()) && isSecurityEnabled;
// 6.0+ requires TLS for production licenses, so if TLS is enabled and security is enabled
// we can interpret this as an explicit enabling of security if the security enabled
// setting is not explicitly set
this.isSecurityExplicitlyEnabled = isSecurityEnabled &&
(settings.hasValue(XPackSettings.SECURITY_ENABLED.getKey()) || XPackSettings.TRANSPORT_SSL_ENABLED.get(settings));
}
/** Updates the current state of the license, which will change what features are available. */

View File

@ -79,6 +79,16 @@ public class XPackLicenseStateTests extends ESTestCase {
assertThat(licenseState.allowedRealmType(), is(XPackLicenseState.AllowedRealmType.ALL));
assertThat(licenseState.isCustomRoleProvidersAllowed(), is(true));
licenseState =
new XPackLicenseState(Settings.builder().put(XPackSettings.TRANSPORT_SSL_ENABLED.getKey(), true).build());
assertThat(licenseState.isAuthAllowed(), is(true));
assertThat(licenseState.isIpFilteringAllowed(), is(true));
assertThat(licenseState.isAuditingAllowed(), is(true));
assertThat(licenseState.isStatsAndHealthAllowed(), is(true));
assertThat(licenseState.isDocumentAndFieldLevelSecurityAllowed(), is(true));
assertThat(licenseState.allowedRealmType(), is(XPackLicenseState.AllowedRealmType.ALL));
assertThat(licenseState.isCustomRoleProvidersAllowed(), is(true));
licenseState = new XPackLicenseState(Settings.EMPTY);
assertThat(licenseState.isAuthAllowed(), is(true));
assertThat(licenseState.isIpFilteringAllowed(), is(true));

View File

@ -472,7 +472,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
components.add(ipFilter.get());
DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterService.getClusterSettings());
securityInterceptor.set(new SecurityServerTransportInterceptor(settings, threadPool, authcService.get(),
authzService, getLicenseState(), getSslService(), securityContext.get(), destructiveOperations));
authzService, getLicenseState(), getSslService(), securityContext.get(), destructiveOperations, clusterService));
final Set<RequestInterceptor> requestInterceptors;
if (XPackSettings.DLS_FLS_ENABLED.get(settings)) {

View File

@ -9,12 +9,14 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -72,6 +74,8 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
private final SecurityContext securityContext;
private final boolean reservedRealmEnabled;
private volatile boolean isStateNotRecovered = true;
public SecurityServerTransportInterceptor(Settings settings,
ThreadPool threadPool,
AuthenticationService authcService,
@ -79,7 +83,8 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
XPackLicenseState licenseState,
SSLService sslService,
SecurityContext securityContext,
DestructiveOperations destructiveOperations) {
DestructiveOperations destructiveOperations,
ClusterService clusterService) {
super(settings);
this.settings = settings;
this.threadPool = threadPool;
@ -90,6 +95,7 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
this.securityContext = securityContext;
this.profileFilters = initializeProfileFilters(destructiveOperations);
this.reservedRealmEnabled = XPackSettings.RESERVED_REALM_ENABLED_SETTING.get(settings);
clusterService.addListener(e -> isStateNotRecovered = e.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK));
}
@Override
@ -98,7 +104,13 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (licenseState.isSecurityEnabled() && licenseState.isAuthAllowed()) {
// make a local copy of isStateNotRecovered as this is a volatile variable and it
// is used multiple times in the method. The copy to a local variable allows us to
// guarantee we use the same value wherever we would check the value for the state
// being recovered
final boolean stateNotRecovered = isStateNotRecovered;
final boolean sendWithAuth = (licenseState.isSecurityEnabled() && licenseState.isAuthAllowed()) || stateNotRecovered;
if (sendWithAuth) {
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
// ourselves otherwise we wind up using a version newer than what we can actually send
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
@ -108,20 +120,20 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender), minVersion);
, handler), sender, stateNotRecovered), minVersion);
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
(original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender));
, handler), sender, stateNotRecovered));
} else if (securityContext.getAuthentication() != null &&
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
// re-write the authentication since we want the authentication version to match the version of the connection
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender),
minVersion);
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
stateNotRecovered), minVersion);
} else {
sendWithUser(connection, action, request, options, handler, sender);
sendWithUser(connection, action, request, options, handler, sender, stateNotRecovered);
}
} else {
sender.sendRequest(connection, action, request, options, handler);
@ -132,9 +144,10 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
private <T extends TransportResponse> void sendWithUser(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler,
AsyncSender sender) {
// There cannot be a request outgoing from this node that is not associated with a user.
if (securityContext.getAuthentication() == null) {
AsyncSender sender, final boolean stateNotRecovered) {
// There cannot be a request outgoing from this node that is not associated with a user
// unless we do not know the actual license of the cluster
if (securityContext.getAuthentication() == null && stateNotRecovered == false) {
// we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch
// in tests and may be hit by a user
assertNoAuthentication(action);

View File

@ -23,11 +23,16 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.transport.Transport;
@ -41,7 +46,10 @@ import org.elasticsearch.xpack.security.LocalStateSecurity;
import org.junit.After;
import org.junit.Before;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -115,6 +123,18 @@ public class LicensingTests extends SecurityIntegTestCase {
return plugins;
}
@Override
protected int maxNumberOfNodes() {
return super.maxNumberOfNodes() + 1;
}
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
.build();
}
@Before
public void resetLicensing() {
enableLicensing();
@ -250,6 +270,34 @@ public class LicensingTests extends SecurityIntegTestCase {
}
}
public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception {
License.OperationMode mode = randomFrom(License.OperationMode.GOLD, License.OperationMode.PLATINUM, License.OperationMode.STANDARD);
enableLicensing(mode);
ensureGreen();
Path home = createTempDir();
Path conf = home.resolve("config");
Files.createDirectories(conf);
Settings nodeSettings = Settings.builder()
.put(nodeSettings(maxNumberOfNodes() - 1).filter(s -> "xpack.security.enabled".equals(s) == false))
.put("node.name", "my-test-node")
.put("network.host", "localhost")
.put("cluster.name", internalCluster().getClusterName())
.put("discovery.zen.minimum_master_nodes",
internalCluster().getInstance(Settings.class).get("discovery.zen.minimum_master_nodes"))
.put("path.home", home)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen")
.put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen")
.build();
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(LocalStateSecurity.class, TestZenDiscovery.TestPlugin.class,
MockHttpTransport.TestPlugin.class);
try (Node node = new MockNode(nodeSettings, mockPlugins)) {
node.start();
ensureStableCluster(cluster().size() + 1);
}
}
private static void assertElasticsearchSecurityException(ThrowingRunnable runnable) {
ElasticsearchSecurityException ee = expectThrows(ElasticsearchSecurityException.class, runnable);
assertThat(ee.getMetadata(LicenseUtils.EXPIRED_FEATURE_METADATA), hasItem(XPackField.SECURITY));

View File

@ -7,11 +7,17 @@ package org.elasticsearch.xpack.security.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.Transport.Connection;
@ -31,6 +37,7 @@ import org.elasticsearch.xpack.core.security.user.User;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.authz.AuthorizationService;
import org.junit.After;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
@ -54,25 +61,33 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
private ThreadContext threadContext;
private XPackLicenseState xPackLicenseState;
private SecurityContext securityContext;
private ClusterService clusterService;
@Override
public void setUp() throws Exception {
super.setUp();
settings = Settings.builder().put("path.home", createTempDir()).build();
threadPool = mock(ThreadPool.class);
threadContext = new ThreadContext(settings);
when(threadPool.getThreadContext()).thenReturn(threadContext);
threadPool = new TestThreadPool(getTestName());
clusterService = ClusterServiceUtils.createClusterService(threadPool);
threadContext = threadPool.getThreadContext();
securityContext = spy(new SecurityContext(settings, threadPool.getThreadContext()));
xPackLicenseState = mock(XPackLicenseState.class);
when(xPackLicenseState.isAuthAllowed()).thenReturn(true);
when(xPackLicenseState.isSecurityEnabled()).thenReturn(true);
}
@After
public void stopThreadPool() throws Exception {
clusterService.close();
terminate(threadPool);
}
public void testSendAsyncUnlicensed() {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))));
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
when(xPackLicenseState.isAuthAllowed()).thenReturn(false);
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
@ -92,6 +107,46 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
verifyZeroInteractions(securityContext);
}
public void testSendAsyncWithStateNotRecovered() {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
final boolean securityEnabled = randomBoolean();
final boolean authAllowed = securityEnabled && randomBoolean();
when(xPackLicenseState.isAuthAllowed()).thenReturn(authAllowed);
when(xPackLicenseState.isSecurityEnabled()).thenReturn(securityEnabled);
ClusterState notRecovered = ClusterState.builder(clusterService.state())
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK).build())
.build();
ClusterServiceUtils.setState(clusterService, notRecovered);
assertTrue(clusterService.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK));
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AtomicReference<User> sendingUser = new AtomicReference<>();
AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (calledWrappedSender.compareAndSet(false, true) == false) {
fail("sender called more than once!");
}
sendingUser.set(securityContext.getUser());
}
});
Connection connection = mock(Connection.class);
when(connection.getVersion()).thenReturn(Version.CURRENT);
sender.sendRequest(connection, "internal:foo", null, null, null);
assertTrue(calledWrappedSender.get());
assertEquals(SystemUser.INSTANCE, sendingUser.get());
verify(xPackLicenseState).isSecurityEnabled();
if (securityEnabled) {
verify(xPackLicenseState).isAuthAllowed();
}
verify(securityContext).executeAsUser(any(User.class), any(Consumer.class), eq(Version.CURRENT));
verifyNoMoreInteractions(xPackLicenseState);
}
public void testSendAsync() throws Exception {
final User authUser = randomBoolean() ? new User("authenticator") : null;
final User user = new User("test", randomRoles(), authUser);
@ -100,7 +155,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))));
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AtomicReference<User> sendingUser = new AtomicReference<>();
@ -136,7 +192,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))));
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AtomicReference<User> sendingUser = new AtomicReference<>();
@ -167,11 +224,12 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)))) {
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService) {
@Override
void assertNoAuthentication(String action) {
}
};
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
assertNull(securityContext.getUser());
AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
@ -203,7 +261,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))));
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AtomicReference<User> sendingUser = new AtomicReference<>();
@ -243,7 +302,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))));
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
AtomicReference<User> sendingUser = new AtomicReference<>();