From 667f842f92979d47246fbb4cd06cce4822ffda3b Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Thu, 18 May 2017 14:39:36 -0400 Subject: [PATCH] Fix authentication forward compatibility (elastic/x-pack-elasticsearch#1481) The authentication object was changed in 5.4.0 in that it was conditionally signed depending on the version and other factors. A bug was introduced however that causes the authentication to actually get written with the version of the node it is being sent to even if that version is greater than the version of the current node, which causes rolling upgrades to fail. Original commit: elastic/x-pack-elasticsearch@a718ff8a52b8727cddf5c535c30fc144c5d1518c --- .../xpack/security/authc/Authentication.java | 1 + .../security/authc/AuthenticationService.java | 1 - .../SecurityServerTransportInterceptor.java | 10 ++- .../transport/ServerTransportFilter.java | 3 +- ...curityServerTransportInterceptorTests.java | 78 +++++++++++++++++++ qa/rolling-upgrade/build.gradle | 12 +++ 6 files changed, 100 insertions(+), 5 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java index cfec9e644bb..18ecfae7dc2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java @@ -119,6 +119,7 @@ public class Authentication { String encode() throws IOException { BytesStreamOutput output = new BytesStreamOutput(); + output.setVersion(version); Version.writeVersion(version, output); writeTo(output); return Base64.getEncoder().encodeToString(BytesReference.toBytes(output.bytes())); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/AuthenticationService.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/AuthenticationService.java index cb80396eef0..e6ebee98268 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/AuthenticationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/AuthenticationService.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.SecureString; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index dd208c70893..c722d53c1bd 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -88,12 +88,16 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem public void sendRequest(Transport.Connection connection, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) { if (licenseState.isAuthAllowed()) { + // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it + // ourselves otherwise we wind up using a version newer than what we can actually send + final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT); + // Sometimes a system action gets executed like a internal create index request or update mappings request // which means that the user is copied over to system actions so we need to change the user if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) { securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options, new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original) - , handler), sender), connection.getVersion()); + , handler), sender), minVersion); } else if (reservedRealmEnabled && connection.getVersion().before(Version.V_5_2_0) && KibanaUser.NAME.equals(securityContext.getUser().principal())) { final User kibanaUser = securityContext.getUser(); @@ -103,11 +107,11 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender), connection.getVersion()); } else if (securityContext.getAuthentication() != null && - securityContext.getAuthentication().getVersion().equals(connection.getVersion()) == false) { + securityContext.getAuthentication().getVersion().equals(minVersion) == false) { // re-write the authentication since we want the authentication version to match the version of the connection securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options, new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender), - connection.getVersion()); + minVersion); } else { sendWithUser(connection, action, request, options, handler, sender); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index dee50d22215..22840e640d0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -123,6 +123,7 @@ public interface ServerTransportFilter { } } + final Version version = transportChannel.getVersion().equals(Version.V_5_4_0) ? Version.CURRENT : transportChannel.getVersion(); authcService.authenticate(securityAction, request, null, ActionListener.wrap((authentication) -> { if (reservedRealmEnabled && authentication.getVersion().before(Version.V_5_2_0) && KibanaUser.NAME.equals(authentication.getUser().authenticatedUser().principal())) { @@ -137,7 +138,7 @@ public interface ServerTransportFilter { listener.onResponse(null); }); asyncAuthorizer.authorize(authzService); - }, transportChannel.getVersion()); + }, version); } else { final AuthorizationUtils.AsyncAuthorizer asyncAuthorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, listener, (userRoles, runAsRoles) -> { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index 0d386516f78..654635ab462 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -247,6 +247,84 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { verifyNoMoreInteractions(xPackLicenseState); } + public void testSendToNewerVersionSetsCorrectVersion() throws Exception { + final User user = new User("joe", "role"); + final Authentication authentication = new Authentication(user, new RealmRef("file", "file", "node1"), null); + authentication.writeToContext(threadContext); + threadContext.putTransient(AuthorizationService.ORIGINATING_ACTION_KEY, "indices:foo"); + + SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, + mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), + securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)))); + + AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + AtomicReference sendingUser = new AtomicReference<>(); + AtomicReference authRef = new AtomicReference<>(); + AsyncSender intercepted = new AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + if (calledWrappedSender.compareAndSet(false, true) == false) { + fail("sender called more than once!"); + } + sendingUser.set(securityContext.getUser()); + authRef.set(securityContext.getAuthentication()); + } + }; + AsyncSender sender = interceptor.interceptSender(intercepted); + final Version connectionVersion = Version.fromId(Version.CURRENT.id + randomIntBetween(100, 100000)); + assertEquals(Version.CURRENT, Version.min(connectionVersion, Version.CURRENT)); + + Transport.Connection connection = mock(Transport.Connection.class); + when(connection.getVersion()).thenReturn(connectionVersion); + sender.sendRequest(connection, "indices:foo[s]", null, null, null); + assertTrue(calledWrappedSender.get()); + assertEquals(user, sendingUser.get()); + assertEquals(user, securityContext.getUser()); + assertEquals(Version.CURRENT, authRef.get().getVersion()); + assertEquals(Version.CURRENT, authentication.getVersion()); + } + + public void testSendToOlderVersionSetsCorrectVersion() throws Exception { + final User user = new User("joe", "role"); + final Authentication authentication = new Authentication(user, new RealmRef("file", "file", "node1"), null); + authentication.writeToContext(threadContext); + threadContext.putTransient(AuthorizationService.ORIGINATING_ACTION_KEY, "indices:foo"); + + SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool, + mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class), + securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)))); + + AtomicBoolean calledWrappedSender = new AtomicBoolean(false); + AtomicReference sendingUser = new AtomicReference<>(); + AtomicReference authRef = new AtomicReference<>(); + AsyncSender intercepted = new AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + if (calledWrappedSender.compareAndSet(false, true) == false) { + fail("sender called more than once!"); + } + sendingUser.set(securityContext.getUser()); + authRef.set(securityContext.getAuthentication()); + } + }; + AsyncSender sender = interceptor.interceptSender(intercepted); + final Version connectionVersion = Version.fromId(Version.CURRENT.id - randomIntBetween(100, 100000)); + assertEquals(connectionVersion, Version.min(connectionVersion, Version.CURRENT)); + + Transport.Connection connection = mock(Transport.Connection.class); + when(connection.getVersion()).thenReturn(connectionVersion); + sender.sendRequest(connection, "indices:foo[s]", null, null, null); + assertTrue(calledWrappedSender.get()); + assertEquals(user, sendingUser.get()); + assertEquals(user, securityContext.getUser()); + assertEquals(connectionVersion, authRef.get().getVersion()); + assertEquals(Version.CURRENT, authentication.getVersion()); + } + public void testContextRestoreResponseHandler() throws Exception { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index e9c96e317f6..f6e900d76f5 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -124,6 +124,18 @@ for (Version version : wireCompatVersions) { Task upgradedClusterTestRunner = tasks.getByName("${baseName}#upgradedClusterTestRunner") upgradedClusterTestRunner.configure { systemProperty 'tests.rest.suite', 'upgraded_cluster' + + // migration tests should only run when the original/old cluster nodes where versions < 5.2.0. + // this stinks but we do the check here since our rest tests do not support conditionals + // otherwise we could check the index created version + String versionStr = project.extensions.findByName("${baseName}#oldClusterTestCluster").properties.get('bwcVersion') + String[] versionParts = versionStr.split('\\.') + if (versionParts[0].equals("5")) { + Integer minor = Integer.parseInt(versionParts[1]) + if (minor >= 2) { + systemProperty 'tests.rest.blacklist', '/20_security/Verify default password migration results in upgraded cluster' + } + } // only need to kill the mixed cluster tests node here because we explicitly told it to not stop nodes upon completion finalizedBy "${baseName}#mixedClusterTestCluster#stop" }