Always attach system user to internal actions (#43902)
All valid licenses permit security, and the only license state where we don't support security is when there is a missing license. However, for safety we should attach the system (or xpack/security) user to internally originated actions even if the license is missing (or, more strictly, doesn't support security). This allows all nodes to communicate and send internal actions (shard state, handshake/pings, etc) even if a license is transitioning between a broken state and a valid state. Relates: #42215 Backport of: #43468
This commit is contained in:
parent
cd2972239c
commit
deacc2038e
|
@ -102,50 +102,55 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
|
|||
@Override
|
||||
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
|
||||
TransportRequestOptions options, TransportResponseHandler<T> handler) {
|
||||
// make a local copy of isStateNotRecovered as this is a volatile variable and it
|
||||
// is used multiple times in the method. The copy to a local variable allows us to
|
||||
// guarantee we use the same value wherever we would check the value for the state
|
||||
// being recovered
|
||||
final boolean stateNotRecovered = isStateNotRecovered;
|
||||
final boolean sendWithAuth = licenseState.isAuthAllowed() || stateNotRecovered;
|
||||
if (sendWithAuth) {
|
||||
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
|
||||
// ourselves otherwise we wind up using a version newer than what we can actually send
|
||||
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
|
||||
final boolean requireAuth = shouldRequireExistingAuthentication();
|
||||
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
|
||||
// ourselves otherwise we wind up using a version newer than what we can actually send
|
||||
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
|
||||
|
||||
// Sometimes a system action gets executed like a internal create index request or update mappings request
|
||||
// which means that the user is copied over to system actions so we need to change the user
|
||||
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
|
||||
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
|
||||
, handler), sender, stateNotRecovered), minVersion);
|
||||
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
|
||||
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
|
||||
(original) -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
|
||||
, handler), sender, stateNotRecovered));
|
||||
} else if (securityContext.getAuthentication() != null &&
|
||||
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
|
||||
// re-write the authentication since we want the authentication version to match the version of the connection
|
||||
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
|
||||
stateNotRecovered), minVersion);
|
||||
} else {
|
||||
sendWithUser(connection, action, request, options, handler, sender, stateNotRecovered);
|
||||
}
|
||||
// Sometimes a system action gets executed like a internal create index request or update mappings request
|
||||
// which means that the user is copied over to system actions so we need to change the user
|
||||
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
|
||||
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
|
||||
, handler), sender, requireAuth), minVersion);
|
||||
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
|
||||
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
|
||||
(original) -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
|
||||
, handler), sender, requireAuth));
|
||||
} else if (securityContext.getAuthentication() != null &&
|
||||
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
|
||||
// re-write the authentication since we want the authentication version to match the version of the connection
|
||||
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
|
||||
requireAuth), minVersion);
|
||||
} else {
|
||||
sender.sendRequest(connection, action, request, options, handler);
|
||||
sendWithUser(connection, action, request, options, handler, sender, requireAuth);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Based on the current cluster state & license, should we require that all outgoing actions have an authentication header
|
||||
* of some sort?
|
||||
*/
|
||||
private boolean shouldRequireExistingAuthentication() {
|
||||
// If the license state is MISSING, then auth is not allowed.
|
||||
// However this makes it difficult to installing a valid license, because that might implicitly turn on security.
|
||||
// When security is enabled on the master node it will then reject any actions that do not have authentication headers
|
||||
// but there may be in-flight internal actions (that will not have authentication headers) such as "cluster/shard/started"
|
||||
// which we don't want to reject.
|
||||
// So, we always send authentication headers for actions that have an implied user (system-user or explicit-origin)
|
||||
// and then for other (user originated) actions we enforce that there is an authentication header that we can send, iff the
|
||||
// current license allows authentication.
|
||||
return licenseState.isAuthAllowed() && isStateNotRecovered == false;
|
||||
}
|
||||
|
||||
private <T extends TransportResponse> void sendWithUser(Transport.Connection connection, String action, TransportRequest request,
|
||||
TransportRequestOptions options, TransportResponseHandler<T> handler,
|
||||
AsyncSender sender, final boolean stateNotRecovered) {
|
||||
// There cannot be a request outgoing from this node that is not associated with a user
|
||||
// unless we do not know the actual license of the cluster
|
||||
if (securityContext.getAuthentication() == null && stateNotRecovered == false) {
|
||||
AsyncSender sender, final boolean requireAuthentication) {
|
||||
if (securityContext.getAuthentication() == null && requireAuthentication) {
|
||||
// we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch
|
||||
// in tests and may be hit by a user
|
||||
assertNoAuthentication(action);
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.security.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.main.MainAction;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
|
@ -45,6 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -52,7 +55,6 @@ import static org.mockito.Mockito.never;
|
|||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class SecurityServerTransportInterceptorTests extends ESTestCase {
|
||||
|
@ -82,7 +84,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
|
|||
terminate(threadPool);
|
||||
}
|
||||
|
||||
public void testSendAsyncUnlicensed() {
|
||||
public void testSendAsyncUserActionWhenUnlicensed() {
|
||||
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
|
||||
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
|
||||
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
|
@ -90,6 +92,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
|
|||
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
|
||||
when(xPackLicenseState.isAuthAllowed()).thenReturn(false);
|
||||
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
|
||||
AtomicReference<User> sendingUser = new AtomicReference<>();
|
||||
AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
|
||||
@Override
|
||||
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
|
||||
|
@ -97,13 +100,45 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
|
|||
if (calledWrappedSender.compareAndSet(false, true) == false) {
|
||||
fail("sender called more than once!");
|
||||
}
|
||||
sendingUser.set(securityContext.getUser());
|
||||
}
|
||||
});
|
||||
sender.sendRequest(null, null, null, null, null);
|
||||
Connection connection = mock(Connection.class);
|
||||
when(connection.getVersion()).thenReturn(Version.CURRENT);
|
||||
sender.sendRequest(connection, MainAction.NAME, null, null, null);
|
||||
assertTrue(calledWrappedSender.get());
|
||||
assertThat(sendingUser.get(), nullValue());
|
||||
verify(xPackLicenseState).isAuthAllowed();
|
||||
verifyNoMoreInteractions(xPackLicenseState);
|
||||
verifyZeroInteractions(securityContext);
|
||||
}
|
||||
|
||||
public void testSendAsyncInternalActionWhenUnlicensed() {
|
||||
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
|
||||
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
|
||||
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))), clusterService);
|
||||
ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
|
||||
when(xPackLicenseState.isAuthAllowed()).thenReturn(false);
|
||||
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
|
||||
AtomicReference<User> sendingUser = new AtomicReference<>();
|
||||
AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
|
||||
@Override
|
||||
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
|
||||
TransportRequestOptions options, TransportResponseHandler<T> handler) {
|
||||
if (calledWrappedSender.compareAndSet(false, true) == false) {
|
||||
fail("sender called more than once!");
|
||||
}
|
||||
sendingUser.set(securityContext.getUser());
|
||||
}
|
||||
});
|
||||
Connection connection = mock(Connection.class);
|
||||
when(connection.getVersion()).thenReturn(Version.CURRENT);
|
||||
sender.sendRequest(connection, "internal:foo", null, null, null);
|
||||
assertTrue(calledWrappedSender.get());
|
||||
assertThat(sendingUser.get(), is(SystemUser.INSTANCE));
|
||||
verify(xPackLicenseState).isAuthAllowed();
|
||||
verify(securityContext).executeAsUser(any(User.class), any(Consumer.class), eq(Version.CURRENT));
|
||||
verifyNoMoreInteractions(xPackLicenseState);
|
||||
}
|
||||
|
||||
public void testSendAsyncWithStateNotRecovered() {
|
||||
|
|
Loading…
Reference in New Issue