From ff96939c5feb2d9617cd440dc2491a952bf91430 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 16 Feb 2017 07:22:02 -0500 Subject: [PATCH] Authentication should set the proper version on the stream This commit ensures that the authentication sets the correct version on the stream when it is serialized over the wire so that there is not a version mismatch between the authentication and the connection it came from. Original commit: elastic/x-pack-elasticsearch@267d7068f43a4ddb1ed60a4730028cbc32f4770a --- .../xpack/security/SecurityContext.java | 9 +++++---- .../action/filter/SecurityActionFilter.java | 3 ++- .../xpack/security/authc/Authentication.java | 8 ++++++-- .../SecurityServerTransportInterceptor.java | 12 ++++++++++-- .../transport/ServerTransportFilter.java | 4 ++-- .../xpack/security/SecurityContextTests.java | 7 ++++--- ...SecurityServerTransportInterceptorTests.java | 17 +++++++++++------ .../transport/ServerTransportFilterTests.java | 5 +++-- 8 files changed, 43 insertions(+), 22 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java index f3fdcba0edc..677ac78bbb8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityContext.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -60,7 +61,7 @@ public class SecurityContext { * Sets the user forcefully to the provided user. There must not be an existing user in the ThreadContext otherwise an exception * will be thrown. This method is package private for testing. */ - void setUser(User user) { + void setUser(User user, Version version) { Objects.requireNonNull(user); final Authentication.RealmRef lookedUpBy; if (user.runAs() == null) { @@ -71,7 +72,7 @@ public class SecurityContext { try { Authentication authentication = - new Authentication(user, new Authentication.RealmRef("__attach", "__attach", nodeName), lookedUpBy); + new Authentication(user, new Authentication.RealmRef("__attach", "__attach", nodeName), lookedUpBy, version); authentication.writeToContext(threadContext); } catch (IOException e) { throw new AssertionError("how can we have a IOException with a user we set", e); @@ -82,10 +83,10 @@ public class SecurityContext { * Runs the consumer in a new context as the provided user. The original constext is provided to the consumer. When this method * returns, the original context is restored. */ - public void executeAsUser(User user, Consumer consumer) { + public void executeAsUser(User user, Consumer consumer, Version version) { final StoredContext original = threadContext.newStoredContext(true); try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { - setUser(user); + setUser(user, version); consumer.accept(original); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java b/plugin/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java index 40bdad715bf..436d978199d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/action/filter/SecurityActionFilter.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.action.filter; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -121,7 +122,7 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil } catch (IOException e) { listener.onFailure(e); } - }); + }, Version.CURRENT); } else { try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(true)) { applyInternal(action, request, authenticatedListener); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java index e35d34c865a..318fa8804d7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/Authentication.java @@ -27,10 +27,14 @@ public class Authentication { private final Version version; public Authentication(User user, RealmRef authenticatedBy, RealmRef lookedUpBy) { + this(user, authenticatedBy, lookedUpBy, Version.CURRENT); + } + + public Authentication(User user, RealmRef authenticatedBy, RealmRef lookedUpBy, Version version) { this.user = Objects.requireNonNull(user); this.authenticatedBy = Objects.requireNonNull(authenticatedBy); this.lookedUpBy = lookedUpBy; - this.version = Version.CURRENT; + this.version = version; } public Authentication(StreamInput in) throws IOException { @@ -147,7 +151,7 @@ public class Authentication { String encode() throws IOException { BytesStreamOutput output = new BytesStreamOutput(); - Version.writeVersion(Version.CURRENT, output); + Version.writeVersion(version, output); writeTo(output); return Base64.getEncoder().encodeToString(BytesReference.toBytes(output.bytes())); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index 0417a70de1b..8c6ee31c6fc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -92,7 +92,7 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) { securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options, new TransportService.ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original) - , handler), sender)); + , handler), sender), connection.getVersion()); } else if (reservedRealmEnabled && connection.getVersion().before(Version.V_5_2_0_UNRELEASED) && KibanaUser.NAME.equals(securityContext.getUser().principal())) { final User kibanaUser = securityContext.getUser(); @@ -100,7 +100,15 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem kibanaUser.email(), kibanaUser.metadata(), kibanaUser.enabled()); securityContext.executeAsUser(bwcKibanaUser, (original) -> sendWithUser(connection, action, request, options, new TransportService.ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), - handler), sender)); + handler), sender), connection.getVersion()); + } else if (securityContext.getAuthentication() != null && + securityContext.getAuthentication().getVersion().equals(connection.getVersion()) == false) { + // re-write the authentication since we want the authentication version to match the version of the connection + securityContext.executeAsUser(securityContext.getUser(), + (original) -> sendWithUser(connection, action, request, options, + new TransportService.ContextRestoreResponseHandler<>( + threadPool.getThreadContext().wrapRestorable(original), handler), sender), + connection.getVersion()); } else { sendWithUser(connection, action, request, options, handler, sender); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index 1f121559dd9..7b402c1e65b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -137,7 +137,7 @@ public interface ServerTransportFilter { listener.onResponse(null); }); asyncAuthorizer.authorize(authzService); - }); + }, transportChannel.getVersion()); } else { throw new IllegalStateException("a disabled user should never be sent. " + kibanaUser); } @@ -151,7 +151,7 @@ public interface ServerTransportFilter { listener.onResponse(null); }); asyncAuthorizer.authorize(authzService); - }); + }, transportChannel.getVersion()); } else { final AuthorizationUtils.AsyncAuthorizer asyncAuthorizer = new AuthorizationUtils.AsyncAuthorizer(authentication, listener, (userRoles, runAsRoles) -> { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java index 3aff0bbcdd7..44db03fea66 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security; +import org.elasticsearch.Version; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; @@ -51,11 +52,11 @@ public class SecurityContextTests extends ESTestCase { final User user = new User("test"); assertNull(securityContext.getAuthentication()); assertNull(securityContext.getUser()); - securityContext.setUser(user); + securityContext.setUser(user, Version.CURRENT); assertEquals(user, securityContext.getUser()); IllegalStateException e = expectThrows(IllegalStateException.class, - () -> securityContext.setUser(randomFrom(user, SystemUser.INSTANCE))); + () -> securityContext.setUser(randomFrom(user, SystemUser.INSTANCE), Version.CURRENT)); assertEquals("authentication is already present in the context", e.getMessage()); } @@ -74,7 +75,7 @@ public class SecurityContextTests extends ESTestCase { securityContext.executeAsUser(executionUser, (originalCtx) -> { assertEquals(executionUser, securityContext.getUser()); contextAtomicReference.set(originalCtx); - }); + }, Version.CURRENT); final User userAfterExecution = securityContext.getUser(); assertEquals(original, userAfterExecution); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index e071b2e8ce6..6993d550b6c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.Transport.Connection; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor.AsyncSender; import org.elasticsearch.transport.TransportRequest; @@ -40,6 +41,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.arrayContaining; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -120,7 +122,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { assertEquals(user, sendingUser.get()); assertEquals(user, securityContext.getUser()); verify(xPackLicenseState).isAuthAllowed(); - verify(securityContext, never()).executeAsUser(any(User.class), any(Consumer.class)); + verify(securityContext, never()).executeAsUser(any(User.class), any(Consumer.class), any(Version.class)); verifyNoMoreInteractions(xPackLicenseState); } @@ -147,13 +149,15 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { sendingUser.set(securityContext.getUser()); } }); - sender.sendRequest(null, "internal:foo", null, null, null); + Connection connection = mock(Connection.class); + when(connection.getVersion()).thenReturn(Version.CURRENT); + sender.sendRequest(connection, "internal:foo", null, null, null); assertTrue(calledWrappedSender.get()); assertNotEquals(user, sendingUser.get()); assertEquals(SystemUser.INSTANCE, sendingUser.get()); assertEquals(user, securityContext.getUser()); verify(xPackLicenseState).isAuthAllowed(); - verify(securityContext).executeAsUser(any(User.class), any(Consumer.class)); + verify(securityContext).executeAsUser(any(User.class), any(Consumer.class), eq(Version.CURRENT)); verifyNoMoreInteractions(xPackLicenseState); } @@ -178,7 +182,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { assertEquals("there should always be a user when sending a message", e.getMessage()); assertNull(securityContext.getUser()); verify(xPackLicenseState).isAuthAllowed(); - verify(securityContext, never()).executeAsUser(any(User.class), any(Consumer.class)); + verify(securityContext, never()).executeAsUser(any(User.class), any(Consumer.class), any(Version.class)); verifyNoMoreInteractions(xPackLicenseState); } @@ -207,7 +211,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { }; AsyncSender sender = interceptor.interceptSender(intercepted); Transport.Connection connection = mock(Transport.Connection.class); - when(connection.getVersion()).thenReturn(Version.fromId(randomIntBetween(Version.V_5_0_0_ID, Version.V_5_2_0_ID_UNRELEASED - 100))); + final Version version = Version.fromId(randomIntBetween(Version.V_5_0_0_ID, Version.V_5_2_0_ID_UNRELEASED - 100)); + when(connection.getVersion()).thenReturn(version); sender.sendRequest(connection, "indices:foo[s]", null, null, null); assertTrue(calledWrappedSender.get()); assertNotEquals(user, sendingUser.get()); @@ -238,7 +243,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase { assertEquals(user, sendingUser.get()); verify(xPackLicenseState, times(3)).isAuthAllowed(); - verify(securityContext, times(1)).executeAsUser(any(User.class), any(Consumer.class)); + verify(securityContext, times(1)).executeAsUser(any(User.class), any(Consumer.class), eq(version)); verifyNoMoreInteractions(xPackLicenseState); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java index 9e315698b20..feed6db5a83 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterTests.java @@ -225,8 +225,8 @@ public class ServerTransportFilterTests extends ESTestCase { TransportRequest request = mock(TransportRequest.class); User user = new User("kibana", "kibana"); Authentication authentication = mock(Authentication.class); - when(authentication.getVersion()) - .thenReturn(Version.fromId(randomIntBetween(Version.V_5_0_0_ID, Version.V_5_2_0_ID_UNRELEASED - 100))); + final Version version = Version.fromId(randomIntBetween(Version.V_5_0_0_ID, Version.V_5_2_0_ID_UNRELEASED - 100)); + when(authentication.getVersion()).thenReturn(version); when(authentication.getUser()).thenReturn(user); when(authentication.getRunAsUser()).thenReturn(user); doAnswer((i) -> { @@ -246,6 +246,7 @@ public class ServerTransportFilterTests extends ESTestCase { }).when(authzService).roles(any(User.class), any(ActionListener.class)); ServerTransportFilter filter = getClientOrNodeFilter(); PlainActionFuture future = new PlainActionFuture<>(); + when(channel.getVersion()).thenReturn(version); filter.inbound("_action", request, channel, future); assertNotNull(rolesRef.get()); assertThat(rolesRef.get(), arrayContaining("kibana_system"));