diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index d05e1572a17..7f91374d668 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -102,50 +102,55 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor @Override public void sendRequest(Transport.Connection connection, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) { - // make a local copy of isStateNotRecovered as this is a volatile variable and it - // is used multiple times in the method. The copy to a local variable allows us to - // guarantee we use the same value wherever we would check the value for the state - // being recovered - final boolean stateNotRecovered = isStateNotRecovered; - final boolean sendWithAuth = licenseState.isAuthAllowed() || stateNotRecovered; - if (sendWithAuth) { - // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it - // ourselves otherwise we wind up using a version newer than what we can actually send - final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT); + final boolean requireAuth = shouldRequireExistingAuthentication(); + // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it + // ourselves otherwise we wind up using a version newer than what we can actually send + final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT); - // Sometimes a system action gets executed like a internal create index request or update mappings request - // which means that the user is copied over to system actions so we need to change the user - if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) { - securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options, - new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original) - , handler), sender, stateNotRecovered), minVersion); - } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) { - AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext, - (original) -> sendWithUser(connection, action, request, options, - new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original) - , handler), sender, stateNotRecovered)); - } else if (securityContext.getAuthentication() != null && - securityContext.getAuthentication().getVersion().equals(minVersion) == false) { - // re-write the authentication since we want the authentication version to match the version of the connection - securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options, - new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender, - stateNotRecovered), minVersion); - } else { - sendWithUser(connection, action, request, options, handler, sender, stateNotRecovered); - } + // Sometimes a system action gets executed like a internal create index request or update mappings request + // which means that the user is copied over to system actions so we need to change the user + if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) { + securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options, + new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original) + , handler), sender, requireAuth), minVersion); + } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) { + AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext, + (original) -> sendWithUser(connection, action, request, options, + new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original) + , handler), sender, requireAuth)); + } else if (securityContext.getAuthentication() != null && + securityContext.getAuthentication().getVersion().equals(minVersion) == false) { + // re-write the authentication since we want the authentication version to match the version of the connection + securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options, + new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender, + requireAuth), minVersion); } else { - sender.sendRequest(connection, action, request, options, handler); + sendWithUser(connection, action, request, options, handler, sender, requireAuth); } } }; } + /** + * Based on the current cluster state & license, should we require that all outgoing actions have an authentication header + * of some sort? + */ + private boolean shouldRequireExistingAuthentication() { + // If the license state is MISSING, then auth is not allowed. + // However this makes it difficult to installing a valid license, because that might implicitly turn on security. + // When security is enabled on the master node it will then reject any actions that do not have authentication headers + // but there may be in-flight internal actions (that will not have authentication headers) such as "cluster/shard/started" + // which we don't want to reject. + // So, we always send authentication headers for actions that have an implied user (system-user or explicit-origin) + // and then for other (user originated) actions we enforce that there is an authentication header that we can send, iff the + // current license allows authentication. + return licenseState.isAuthAllowed() && isStateNotRecovered == false; + } + private void sendWithUser(Transport.Connection connection, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler, - AsyncSender sender, final boolean stateNotRecovered) { - // There cannot be a request outgoing from this node that is not associated with a user - // unless we do not know the actual license of the cluster - if (securityContext.getAuthentication() == null && stateNotRecovered == false) { + AsyncSender sender, final boolean requireAuthentication) { + if (securityContext.getAuthentication() == null && requireAuthentication) { // we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch // in tests and may be hit by a user assertNoAuthentication(action); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index 1b85049da23..89f33f809be 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.transport; import org.elasticsearch.Version; +import org.elasticsearch.action.main.MainAction; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -45,6 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -52,7 +55,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class SecurityServerTransportInterceptorTests extends ESTestCase { @@ -82,7 +84,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { terminate(threadPool); } - public void testSendAsyncUnlicensed() { + public void testSendAsyncUserActionWhenUnlicensed() { SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, @@ -90,6 +92,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener when(xPackLicenseState.isAuthAllowed()).thenReturn(false); AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + AtomicReference sendingUser = new AtomicReference<>(); AsyncSender sender = interceptor.interceptSender(new AsyncSender() { @Override public void sendRequest(Transport.Connection connection, String action, TransportRequest request, @@ -97,13 +100,45 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { if (calledWrappedSender.compareAndSet(false, true) == false) { fail("sender called more than once!"); } + sendingUser.set(securityContext.getUser()); } }); - sender.sendRequest(null, null, null, null, null); + Connection connection = mock(Connection.class); + when(connection.getVersion()).thenReturn(Version.CURRENT); + sender.sendRequest(connection, MainAction.NAME, null, null, null); assertTrue(calledWrappedSender.get()); + assertThat(sendingUser.get(), nullValue()); verify(xPackLicenseState).isAuthAllowed(); verifyNoMoreInteractions(xPackLicenseState); - verifyZeroInteractions(securityContext); + } + + public void testSendAsyncInternalActionWhenUnlicensed() { + SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, + mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), + securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService); + ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener + when(xPackLicenseState.isAuthAllowed()).thenReturn(false); + AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + AtomicReference sendingUser = new AtomicReference<>(); + AsyncSender sender = interceptor.interceptSender(new AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + if (calledWrappedSender.compareAndSet(false, true) == false) { + fail("sender called more than once!"); + } + sendingUser.set(securityContext.getUser()); + } + }); + Connection connection = mock(Connection.class); + when(connection.getVersion()).thenReturn(Version.CURRENT); + sender.sendRequest(connection, "internal:foo", null, null, null); + assertTrue(calledWrappedSender.get()); + assertThat(sendingUser.get(), is(SystemUser.INSTANCE)); + verify(xPackLicenseState).isAuthAllowed(); + verify(securityContext).executeAsUser(any(User.class), any(Consumer.class), eq(Version.CURRENT)); + verifyNoMoreInteractions(xPackLicenseState); } public void testSendAsyncWithStateNotRecovered() {