Fix authentication forward compatibility (elastic/x-pack-elasticsearch#1481)
The authentication object was changed in 5.4.0 in that it was conditionally signed depending on the version and other factors. A bug was introduced however that causes the authentication to actually get written with the version of the node it is being sent to even if that version is greater than the version of the current node, which causes rolling upgrades to fail. Original commit: elastic/x-pack-elasticsearch@a718ff8a52
This commit is contained in:
parent
5353c35420
commit
667f842f92
|
@ -119,6 +119,7 @@ public class Authentication {
|
|||
|
||||
String encode() throws IOException {
|
||||
BytesStreamOutput output = new BytesStreamOutput();
|
||||
output.setVersion(version);
|
||||
Version.writeVersion(version, output);
|
||||
writeTo(output);
|
||||
return Base64.getEncoder().encodeToString(BytesReference.toBytes(output.bytes()));
|
||||
|
|
|
@ -10,7 +10,6 @@ import org.apache.logging.log4j.util.Supplier;
|
|||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.SecureString;
|
||||
|
|
|
@ -88,12 +88,16 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
|
|||
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
|
||||
TransportRequestOptions options, TransportResponseHandler<T> handler) {
|
||||
if (licenseState.isAuthAllowed()) {
|
||||
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
|
||||
// ourselves otherwise we wind up using a version newer than what we can actually send
|
||||
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
|
||||
|
||||
// Sometimes a system action gets executed like a internal create index request or update mappings request
|
||||
// which means that the user is copied over to system actions so we need to change the user
|
||||
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
|
||||
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
|
||||
, handler), sender), connection.getVersion());
|
||||
, handler), sender), minVersion);
|
||||
} else if (reservedRealmEnabled && connection.getVersion().before(Version.V_5_2_0) &&
|
||||
KibanaUser.NAME.equals(securityContext.getUser().principal())) {
|
||||
final User kibanaUser = securityContext.getUser();
|
||||
|
@ -103,11 +107,11 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
|
|||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original),
|
||||
handler), sender), connection.getVersion());
|
||||
} else if (securityContext.getAuthentication() != null &&
|
||||
securityContext.getAuthentication().getVersion().equals(connection.getVersion()) == false) {
|
||||
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
|
||||
// re-write the authentication since we want the authentication version to match the version of the connection
|
||||
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender),
|
||||
connection.getVersion());
|
||||
minVersion);
|
||||
} else {
|
||||
sendWithUser(connection, action, request, options, handler, sender);
|
||||
}
|
||||
|
|
|
@ -123,6 +123,7 @@ public interface ServerTransportFilter {
|
|||
}
|
||||
}
|
||||
|
||||
final Version version = transportChannel.getVersion().equals(Version.V_5_4_0) ? Version.CURRENT : transportChannel.getVersion();
|
||||
authcService.authenticate(securityAction, request, null, ActionListener.wrap((authentication) -> {
|
||||
if (reservedRealmEnabled && authentication.getVersion().before(Version.V_5_2_0) &&
|
||||
KibanaUser.NAME.equals(authentication.getUser().authenticatedUser().principal())) {
|
||||
|
@ -137,7 +138,7 @@ public interface ServerTransportFilter {
|
|||
listener.onResponse(null);
|
||||
});
|
||||
asyncAuthorizer.authorize(authzService);
|
||||
}, transportChannel.getVersion());
|
||||
}, version);
|
||||
} else {
|
||||
final AuthorizationUtils.AsyncAuthorizer asyncAuthorizer =
|
||||
new AuthorizationUtils.AsyncAuthorizer(authentication, listener, (userRoles, runAsRoles) -> {
|
||||
|
|
|
@ -247,6 +247,84 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
|
|||
verifyNoMoreInteractions(xPackLicenseState);
|
||||
}
|
||||
|
||||
public void testSendToNewerVersionSetsCorrectVersion() throws Exception {
|
||||
final User user = new User("joe", "role");
|
||||
final Authentication authentication = new Authentication(user, new RealmRef("file", "file", "node1"), null);
|
||||
authentication.writeToContext(threadContext);
|
||||
threadContext.putTransient(AuthorizationService.ORIGINATING_ACTION_KEY, "indices:foo");
|
||||
|
||||
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
|
||||
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
|
||||
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))));
|
||||
|
||||
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
|
||||
AtomicReference<User> sendingUser = new AtomicReference<>();
|
||||
AtomicReference<Authentication> authRef = new AtomicReference<>();
|
||||
AsyncSender intercepted = new AsyncSender() {
|
||||
@Override
|
||||
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
|
||||
TransportRequestOptions options, TransportResponseHandler<T> handler) {
|
||||
if (calledWrappedSender.compareAndSet(false, true) == false) {
|
||||
fail("sender called more than once!");
|
||||
}
|
||||
sendingUser.set(securityContext.getUser());
|
||||
authRef.set(securityContext.getAuthentication());
|
||||
}
|
||||
};
|
||||
AsyncSender sender = interceptor.interceptSender(intercepted);
|
||||
final Version connectionVersion = Version.fromId(Version.CURRENT.id + randomIntBetween(100, 100000));
|
||||
assertEquals(Version.CURRENT, Version.min(connectionVersion, Version.CURRENT));
|
||||
|
||||
Transport.Connection connection = mock(Transport.Connection.class);
|
||||
when(connection.getVersion()).thenReturn(connectionVersion);
|
||||
sender.sendRequest(connection, "indices:foo[s]", null, null, null);
|
||||
assertTrue(calledWrappedSender.get());
|
||||
assertEquals(user, sendingUser.get());
|
||||
assertEquals(user, securityContext.getUser());
|
||||
assertEquals(Version.CURRENT, authRef.get().getVersion());
|
||||
assertEquals(Version.CURRENT, authentication.getVersion());
|
||||
}
|
||||
|
||||
public void testSendToOlderVersionSetsCorrectVersion() throws Exception {
|
||||
final User user = new User("joe", "role");
|
||||
final Authentication authentication = new Authentication(user, new RealmRef("file", "file", "node1"), null);
|
||||
authentication.writeToContext(threadContext);
|
||||
threadContext.putTransient(AuthorizationService.ORIGINATING_ACTION_KEY, "indices:foo");
|
||||
|
||||
SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(settings, threadPool,
|
||||
mock(AuthenticationService.class), mock(AuthorizationService.class), xPackLicenseState, mock(SSLService.class),
|
||||
securityContext, new DestructiveOperations(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))));
|
||||
|
||||
AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
|
||||
AtomicReference<User> sendingUser = new AtomicReference<>();
|
||||
AtomicReference<Authentication> authRef = new AtomicReference<>();
|
||||
AsyncSender intercepted = new AsyncSender() {
|
||||
@Override
|
||||
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
|
||||
TransportRequestOptions options, TransportResponseHandler<T> handler) {
|
||||
if (calledWrappedSender.compareAndSet(false, true) == false) {
|
||||
fail("sender called more than once!");
|
||||
}
|
||||
sendingUser.set(securityContext.getUser());
|
||||
authRef.set(securityContext.getAuthentication());
|
||||
}
|
||||
};
|
||||
AsyncSender sender = interceptor.interceptSender(intercepted);
|
||||
final Version connectionVersion = Version.fromId(Version.CURRENT.id - randomIntBetween(100, 100000));
|
||||
assertEquals(connectionVersion, Version.min(connectionVersion, Version.CURRENT));
|
||||
|
||||
Transport.Connection connection = mock(Transport.Connection.class);
|
||||
when(connection.getVersion()).thenReturn(connectionVersion);
|
||||
sender.sendRequest(connection, "indices:foo[s]", null, null, null);
|
||||
assertTrue(calledWrappedSender.get());
|
||||
assertEquals(user, sendingUser.get());
|
||||
assertEquals(user, securityContext.getUser());
|
||||
assertEquals(connectionVersion, authRef.get().getVersion());
|
||||
assertEquals(Version.CURRENT, authentication.getVersion());
|
||||
}
|
||||
|
||||
public void testContextRestoreResponseHandler() throws Exception {
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
|
||||
|
|
|
@ -124,6 +124,18 @@ for (Version version : wireCompatVersions) {
|
|||
Task upgradedClusterTestRunner = tasks.getByName("${baseName}#upgradedClusterTestRunner")
|
||||
upgradedClusterTestRunner.configure {
|
||||
systemProperty 'tests.rest.suite', 'upgraded_cluster'
|
||||
|
||||
// migration tests should only run when the original/old cluster nodes where versions < 5.2.0.
|
||||
// this stinks but we do the check here since our rest tests do not support conditionals
|
||||
// otherwise we could check the index created version
|
||||
String versionStr = project.extensions.findByName("${baseName}#oldClusterTestCluster").properties.get('bwcVersion')
|
||||
String[] versionParts = versionStr.split('\\.')
|
||||
if (versionParts[0].equals("5")) {
|
||||
Integer minor = Integer.parseInt(versionParts[1])
|
||||
if (minor >= 2) {
|
||||
systemProperty 'tests.rest.blacklist', '/20_security/Verify default password migration results in upgraded cluster'
|
||||
}
|
||||
}
|
||||
// only need to kill the mixed cluster tests node here because we explicitly told it to not stop nodes upon completion
|
||||
finalizedBy "${baseName}#mixedClusterTestCluster#stop"
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue