Use consistent view of realms for authentication (#38815)
This change updates the authentication service to use a consistent view of the realms based on the license state at the start of authentication. Without this, the license can change during authentication of a request and it will result in a failure if the realm that extracted the token is no longer in the realm list. This manifests in some tests as an authentication failure that should never really happen; one example would be the test framework's transport client user should always have a succesful authentication but in the LicensingTests this can fail and will show up as a NoNodeAvailableException. Additionally, the licensing tests have been updated to ensure that there is consistency when changing the license. The license is changed by modifying the internal xpack license state on each node, which has no protection against be changed by some pending cluster action. The methods to disable and enable now ensure we have a green cluster and that the cluster is consistent before returning. Closes #30301
This commit is contained in:
parent
6243a9797f
commit
e59b7b696a
|
@ -152,7 +152,15 @@ public class SecurityActionFilter implements ActionFilter {
|
|||
*/
|
||||
final String securityAction = actionMapper.action(action, request);
|
||||
authcService.authenticate(securityAction, request, SystemUser.INSTANCE,
|
||||
ActionListener.wrap((authc) -> authorizeRequest(authc, securityAction, request, listener), listener::onFailure));
|
||||
ActionListener.wrap((authc) -> {
|
||||
if (authc != null) {
|
||||
authorizeRequest(authc, securityAction, request, listener);
|
||||
} else if (licenseState.isAuthAllowed() == false) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
listener.onFailure(new IllegalStateException("no authentication present but auth is allowed"));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
private <Request extends ActionRequest> void authorizeRequest(Authentication authentication, String securityAction, Request request,
|
||||
|
|
|
@ -59,6 +59,7 @@ public final class TransportSamlAuthenticateAction extends HandledTransportActio
|
|||
listener.onFailure(new IllegalStateException("Cannot find AuthenticationResult on thread context"));
|
||||
return;
|
||||
}
|
||||
assert authentication != null : "authentication should never be null at this point";
|
||||
final Map<String, Object> tokenMeta = (Map<String, Object>) result.getMetadata().get(SamlRealm.CONTEXT_TOKEN_DATA);
|
||||
tokenService.createUserToken(authentication, originatingAuthentication,
|
||||
ActionListener.wrap(tuple -> {
|
||||
|
|
|
@ -72,7 +72,11 @@ public final class TransportCreateTokenAction extends HandledTransportAction<Cre
|
|||
authenticationService.authenticate(CreateTokenAction.NAME, request, authToken,
|
||||
ActionListener.wrap(authentication -> {
|
||||
request.getPassword().close();
|
||||
if (authentication != null) {
|
||||
createToken(request, authentication, originatingAuthentication, true, listener);
|
||||
} else {
|
||||
listener.onFailure(new UnsupportedOperationException("cannot create token if authentication is not allowed"));
|
||||
}
|
||||
}, e -> {
|
||||
// clear the request password
|
||||
request.getPassword().close();
|
||||
|
|
|
@ -196,8 +196,9 @@ public class AuthenticationService {
|
|||
|
||||
private final AuditableRequest request;
|
||||
private final User fallbackUser;
|
||||
|
||||
private final List<Realm> defaultOrderedRealmList;
|
||||
private final ActionListener<Authentication> listener;
|
||||
|
||||
private RealmRef authenticatedBy = null;
|
||||
private RealmRef lookedupBy = null;
|
||||
private AuthenticationToken authenticationToken = null;
|
||||
|
@ -215,6 +216,7 @@ public class AuthenticationService {
|
|||
private Authenticator(AuditableRequest auditableRequest, User fallbackUser, ActionListener<Authentication> listener) {
|
||||
this.request = auditableRequest;
|
||||
this.fallbackUser = fallbackUser;
|
||||
this.defaultOrderedRealmList = realms.asList();
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
|
@ -233,6 +235,11 @@ public class AuthenticationService {
|
|||
* </ol>
|
||||
*/
|
||||
private void authenticateAsync() {
|
||||
if (defaultOrderedRealmList.isEmpty()) {
|
||||
// this happens when the license state changes between the call to authenticate and the actual invocation
|
||||
// to get the realm list
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
lookForExistingAuthentication((authentication) -> {
|
||||
if (authentication != null) {
|
||||
listener.onResponse(authentication);
|
||||
|
@ -255,6 +262,7 @@ public class AuthenticationService {
|
|||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void checkForApiKey() {
|
||||
apiKeyService.authenticateWithApiKeyIfPresent(threadContext, ActionListener.wrap(authResult -> {
|
||||
|
@ -320,7 +328,7 @@ public class AuthenticationService {
|
|||
if (authenticationToken != null) {
|
||||
action = () -> consumer.accept(authenticationToken);
|
||||
} else {
|
||||
for (Realm realm : realms) {
|
||||
for (Realm realm : defaultOrderedRealmList) {
|
||||
final AuthenticationToken token = realm.token(threadContext);
|
||||
if (token != null) {
|
||||
action = () -> consumer.accept(token);
|
||||
|
@ -388,6 +396,7 @@ public class AuthenticationService {
|
|||
userListener.onResponse(null);
|
||||
}
|
||||
};
|
||||
|
||||
final IteratingActionListener<User, Realm> authenticatingListener =
|
||||
new IteratingActionListener<>(ContextPreservingActionListener.wrapPreservingContext(ActionListener.wrap(
|
||||
(user) -> consumeUser(user, messages),
|
||||
|
@ -402,24 +411,24 @@ public class AuthenticationService {
|
|||
}
|
||||
|
||||
private List<Realm> getRealmList(String principal) {
|
||||
final List<Realm> defaultOrderedRealms = realms.asList();
|
||||
final List<Realm> orderedRealmList = this.defaultOrderedRealmList;
|
||||
if (lastSuccessfulAuthCache != null) {
|
||||
final Realm lastSuccess = lastSuccessfulAuthCache.get(principal);
|
||||
if (lastSuccess != null) {
|
||||
final int index = defaultOrderedRealms.indexOf(lastSuccess);
|
||||
final int index = orderedRealmList.indexOf(lastSuccess);
|
||||
if (index > 0) {
|
||||
final List<Realm> smartOrder = new ArrayList<>(defaultOrderedRealms.size());
|
||||
final List<Realm> smartOrder = new ArrayList<>(orderedRealmList.size());
|
||||
smartOrder.add(lastSuccess);
|
||||
for (int i = 1; i < defaultOrderedRealms.size(); i++) {
|
||||
for (int i = 1; i < orderedRealmList.size(); i++) {
|
||||
if (i != index) {
|
||||
smartOrder.add(defaultOrderedRealms.get(i));
|
||||
smartOrder.add(orderedRealmList.get(i));
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableList(smartOrder);
|
||||
}
|
||||
}
|
||||
}
|
||||
return defaultOrderedRealms;
|
||||
return orderedRealmList;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -188,12 +188,12 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
|
|||
case "client":
|
||||
profileFilters.put(entry.getKey(), new ServerTransportFilter.ClientProfile(authcService, authzService,
|
||||
threadPool.getThreadContext(), extractClientCert, destructiveOperations, reservedRealmEnabled,
|
||||
securityContext));
|
||||
securityContext, licenseState));
|
||||
break;
|
||||
case "node":
|
||||
profileFilters.put(entry.getKey(), new ServerTransportFilter.NodeProfile(authcService, authzService,
|
||||
threadPool.getThreadContext(), extractClientCert, destructiveOperations, reservedRealmEnabled,
|
||||
securityContext));
|
||||
securityContext, licenseState));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("unknown profile type: " + type);
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
|
|||
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.transport.TaskTransportChannel;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
import org.elasticsearch.transport.TcpTransportChannel;
|
||||
|
@ -66,10 +67,11 @@ public interface ServerTransportFilter {
|
|||
private final DestructiveOperations destructiveOperations;
|
||||
private final boolean reservedRealmEnabled;
|
||||
private final SecurityContext securityContext;
|
||||
private final XPackLicenseState licenseState;
|
||||
|
||||
NodeProfile(AuthenticationService authcService, AuthorizationService authzService,
|
||||
ThreadContext threadContext, boolean extractClientCert, DestructiveOperations destructiveOperations,
|
||||
boolean reservedRealmEnabled, SecurityContext securityContext) {
|
||||
boolean reservedRealmEnabled, SecurityContext securityContext, XPackLicenseState licenseState) {
|
||||
this.authcService = authcService;
|
||||
this.authzService = authzService;
|
||||
this.threadContext = threadContext;
|
||||
|
@ -77,6 +79,7 @@ public interface ServerTransportFilter {
|
|||
this.destructiveOperations = destructiveOperations;
|
||||
this.reservedRealmEnabled = reservedRealmEnabled;
|
||||
this.securityContext = securityContext;
|
||||
this.licenseState = licenseState;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,6 +119,7 @@ public interface ServerTransportFilter {
|
|||
|
||||
final Version version = transportChannel.getVersion();
|
||||
authcService.authenticate(securityAction, request, (User)null, ActionListener.wrap((authentication) -> {
|
||||
if (authentication != null) {
|
||||
if (securityAction.equals(TransportService.HANDSHAKE_ACTION_NAME) &&
|
||||
SystemUser.is(authentication.getUser()) == false) {
|
||||
securityContext.executeAsUser(SystemUser.INSTANCE, (ctx) -> {
|
||||
|
@ -125,6 +129,11 @@ public interface ServerTransportFilter {
|
|||
} else {
|
||||
authzService.authorize(authentication, securityAction, request, listener);
|
||||
}
|
||||
} else if (licenseState.isAuthAllowed() == false) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
listener.onFailure(new IllegalStateException("no authentication present but auth is allowed"));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
}
|
||||
|
@ -139,9 +148,9 @@ public interface ServerTransportFilter {
|
|||
|
||||
ClientProfile(AuthenticationService authcService, AuthorizationService authzService,
|
||||
ThreadContext threadContext, boolean extractClientCert, DestructiveOperations destructiveOperations,
|
||||
boolean reservedRealmEnabled, SecurityContext securityContext) {
|
||||
boolean reservedRealmEnabled, SecurityContext securityContext, XPackLicenseState licenseState) {
|
||||
super(authcService, authzService, threadContext, extractClientCert, destructiveOperations, reservedRealmEnabled,
|
||||
securityContext);
|
||||
securityContext, licenseState);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,11 +21,11 @@ import org.elasticsearch.client.Response;
|
|||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.settings.SecureString;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.license.License.OperationMode;
|
||||
import org.elasticsearch.node.MockNode;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -54,6 +54,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
@ -68,7 +69,7 @@ import static org.hamcrest.Matchers.notNullValue;
|
|||
@TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE,org.elasticsearch.action.search:TRACE," +
|
||||
"org.elasticsearch.search:TRACE")
|
||||
public class LicensingTests extends SecurityIntegTestCase {
|
||||
public static final String ROLES =
|
||||
private static final String ROLES =
|
||||
SecuritySettingsSource.TEST_ROLE + ":\n" +
|
||||
" cluster: [ all ]\n" +
|
||||
" indices:\n" +
|
||||
|
@ -91,7 +92,7 @@ public class LicensingTests extends SecurityIntegTestCase {
|
|||
" - names: 'b'\n" +
|
||||
" privileges: [all]\n";
|
||||
|
||||
public static final String USERS_ROLES =
|
||||
private static final String USERS_ROLES =
|
||||
SecuritySettingsSource.CONFIG_STANDARD_USER_ROLES +
|
||||
"role_a:user_a,user_b\n" +
|
||||
"role_b:user_b\n";
|
||||
|
@ -131,8 +132,8 @@ public class LicensingTests extends SecurityIntegTestCase {
|
|||
}
|
||||
|
||||
@Before
|
||||
public void resetLicensing() {
|
||||
enableLicensing();
|
||||
public void resetLicensing() throws InterruptedException {
|
||||
enableLicensing(OperationMode.BASIC);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -155,11 +156,7 @@ public class LicensingTests extends SecurityIntegTestCase {
|
|||
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
|
||||
|
||||
refresh();
|
||||
// wait for all replicas to be started (to make sure that there are no more cluster state updates when we disable licensing)
|
||||
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().routingTable()
|
||||
.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty()));
|
||||
|
||||
Client client = internalCluster().transportClient();
|
||||
final Client client = internalCluster().transportClient();
|
||||
|
||||
disableLicensing();
|
||||
|
||||
|
@ -273,7 +270,6 @@ public class LicensingTests extends SecurityIntegTestCase {
|
|||
public void testNodeJoinWithoutSecurityExplicitlyEnabled() throws Exception {
|
||||
License.OperationMode mode = randomFrom(License.OperationMode.GOLD, License.OperationMode.PLATINUM, License.OperationMode.STANDARD);
|
||||
enableLicensing(mode);
|
||||
ensureGreen();
|
||||
|
||||
final List<String> seedHosts = internalCluster().masterClient().admin().cluster().nodesInfo(new NodesInfoRequest()).get()
|
||||
.getNodes().stream().map(n -> n.getTransport().getAddress().publishAddress().toString()).distinct()
|
||||
|
@ -304,23 +300,64 @@ public class LicensingTests extends SecurityIntegTestCase {
|
|||
assertThat(ee.status(), is(RestStatus.FORBIDDEN));
|
||||
}
|
||||
|
||||
public static void disableLicensing() {
|
||||
disableLicensing(License.OperationMode.BASIC);
|
||||
}
|
||||
|
||||
public static void disableLicensing(License.OperationMode operationMode) {
|
||||
private void disableLicensing() throws InterruptedException {
|
||||
// This method first makes sure licensing is enabled everywhere so that we can execute
|
||||
// monitoring actions to ensure we have a stable cluster and only then do we disable.
|
||||
// This is done in an await busy since there is a chance that the enabling of the license
|
||||
// is overwritten by some other cluster activity and the node throws an exception while we
|
||||
// wait for things to stabilize!
|
||||
final boolean success = awaitBusy(() -> {
|
||||
try {
|
||||
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
|
||||
licenseState.update(operationMode, false, null);
|
||||
if (licenseState.isAuthAllowed() == false) {
|
||||
enableLicensing(OperationMode.BASIC);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public static void enableLicensing() {
|
||||
enableLicensing(License.OperationMode.BASIC);
|
||||
ensureGreen();
|
||||
ensureClusterSizeConsistency();
|
||||
ensureClusterStateConsistency();
|
||||
|
||||
// apply the disabling of the license once the cluster is stable
|
||||
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
|
||||
licenseState.update(OperationMode.BASIC, false, null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Caught exception while disabling license", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}, 30L, TimeUnit.SECONDS);
|
||||
assertTrue(success);
|
||||
}
|
||||
|
||||
public static void enableLicensing(License.OperationMode operationMode) {
|
||||
private void enableLicensing(License.OperationMode operationMode) throws InterruptedException {
|
||||
// do this in an await busy since there is a chance that the enabling of the license is
|
||||
// overwritten by some other cluster activity and the node throws an exception while we
|
||||
// wait for things to stabilize!
|
||||
final boolean success = awaitBusy(() -> {
|
||||
try {
|
||||
// first update the license so we can execute monitoring actions
|
||||
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
|
||||
licenseState.update(operationMode, true, null);
|
||||
}
|
||||
|
||||
ensureGreen();
|
||||
ensureClusterSizeConsistency();
|
||||
ensureClusterStateConsistency();
|
||||
|
||||
// re-apply the update in case any node received an updated cluster state that triggered the license state
|
||||
// to change
|
||||
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
|
||||
licenseState.update(operationMode, true, null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Caught exception while enabling license", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}, 30L, TimeUnit.SECONDS);
|
||||
assertTrue(success);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,6 +117,7 @@ import static org.mockito.Mockito.doThrow;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
@ -171,9 +172,9 @@ public class AuthenticationServiceTests extends ESTestCase {
|
|||
XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
when(licenseState.allowedRealmType()).thenReturn(XPackLicenseState.AllowedRealmType.ALL);
|
||||
when(licenseState.isAuthAllowed()).thenReturn(true);
|
||||
realms = new TestRealms(Settings.EMPTY, TestEnvironment.newEnvironment(settings), Collections.<String, Realm.Factory>emptyMap(),
|
||||
realms = spy(new TestRealms(Settings.EMPTY, TestEnvironment.newEnvironment(settings), Collections.<String, Realm.Factory>emptyMap(),
|
||||
licenseState, threadContext, mock(ReservedRealm.class), Arrays.asList(firstRealm, secondRealm),
|
||||
Collections.singletonList(firstRealm));
|
||||
Collections.singletonList(firstRealm)));
|
||||
|
||||
auditTrail = mock(AuditTrailService.class);
|
||||
client = mock(Client.class);
|
||||
|
@ -276,6 +277,8 @@ public class AuthenticationServiceTests extends ESTestCase {
|
|||
}, this::logAndFail));
|
||||
assertTrue(completed.get());
|
||||
verify(auditTrail).authenticationFailed(reqId, firstRealm.name(), token, "_action", message);
|
||||
verify(realms).asList();
|
||||
verifyNoMoreInteractions(realms);
|
||||
}
|
||||
|
||||
public void testAuthenticateSmartRealmOrdering() {
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -207,13 +208,13 @@ public class ServerTransportFilterTests extends ESTestCase {
|
|||
Settings settings = Settings.builder().put("path.home", createTempDir()).build();
|
||||
ThreadContext threadContext = new ThreadContext(settings);
|
||||
return new ServerTransportFilter.ClientProfile(authcService, authzService, threadContext, false, destructiveOperations,
|
||||
reservedRealmEnabled, new SecurityContext(settings, threadContext));
|
||||
reservedRealmEnabled, new SecurityContext(settings, threadContext), new XPackLicenseState(settings));
|
||||
}
|
||||
|
||||
private ServerTransportFilter.NodeProfile getNodeFilter(boolean reservedRealmEnabled) throws IOException {
|
||||
Settings settings = Settings.builder().put("path.home", createTempDir()).build();
|
||||
ThreadContext threadContext = new ThreadContext(settings);
|
||||
return new ServerTransportFilter.NodeProfile(authcService, authzService, threadContext, false, destructiveOperations,
|
||||
reservedRealmEnabled, new SecurityContext(settings, threadContext));
|
||||
reservedRealmEnabled, new SecurityContext(settings, threadContext), new XPackLicenseState(settings));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue