Follow up for elastic/elasticsearchelastic/elasticsearch#22677 (elastic/elasticsearch#4670)

This commit picks up some improvments from elastic/elasticsearch#22677 that streamlines correct restore of stored contexts.

Original commit: elastic/x-pack-elasticsearch@0259de50c8
This commit is contained in:
Simon Willnauer 2017-01-18 16:18:10 +01:00 committed by GitHub
parent 533b525b13
commit 5b5c77f573
8 changed files with 27 additions and 79 deletions

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.common;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import java.util.function.Supplier;
/**
* Restores the given {@link org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext}
* once the listener is invoked
@ -15,27 +17,23 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
public final class ContextPreservingActionListener<R> implements ActionListener<R> {
private final ActionListener<R> delegate;
private final ThreadContext.StoredContext context;
private final ThreadContext threadContext;
private final Supplier<ThreadContext.StoredContext> context;
public ContextPreservingActionListener(ThreadContext threadContext, ThreadContext.StoredContext context, ActionListener<R> delegate) {
public ContextPreservingActionListener(Supplier<ThreadContext.StoredContext> contextSupplier, ActionListener<R> delegate) {
this.delegate = delegate;
this.context = context;
this.threadContext = threadContext;
this.context = contextSupplier;
}
@Override
public void onResponse(R r) {
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
context.restore();
try (ThreadContext.StoredContext ignore = context.get()) {
delegate.onResponse(r);
}
}
@Override
public void onFailure(Exception e) {
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
context.restore();
try (ThreadContext.StoredContext ignore = context.get()) {
delegate.onFailure(e);
}
}

View File

@ -12,6 +12,7 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
@ -72,12 +73,12 @@ public class InternalClient extends FilterClient {
}
final ThreadContext threadContext = threadPool().getThreadContext();
final ThreadContext.StoredContext storedContext = threadContext.newStoredContext();
final Supplier<ThreadContext.StoredContext> storedContext = threadContext.newRestorableContext(true);
// we need to preserve the context here otherwise we execute the response with the XPack user which we can cause problems
// since we expect the callback to run with the authenticated user calling the doExecute method
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
processContext(threadContext);
super.doExecute(action, request, new ContextPreservingActionListener<>(threadContext, storedContext, listener));
super.doExecute(action, request, new ContextPreservingActionListener<>(storedContext, listener));
}
}

View File

@ -90,7 +90,7 @@ public class SecurityContext {
* returns, the original context is restored.
*/
public void executeAsUser(User user, Consumer<StoredContext> consumer) {
final StoredContext original = threadContext.newStoredContext();
final StoredContext original = threadContext.newStoredContext(true);
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
setUser(user);
consumer.accept(original);

View File

@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.security.support.Exceptions.authorizationError;
@ -101,8 +102,8 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil
if (licenseState.isAuthAllowed()) {
final boolean useSystemUser = AuthorizationUtils.shouldReplaceUserWithSystem(threadContext, action);
final ThreadContext.StoredContext toRestore = threadContext.newStoredContext();
final ActionListener<ActionResponse> signingListener = new ContextPreservingActionListener<>(threadContext, toRestore,
final Supplier<ThreadContext.StoredContext> toRestore = threadContext.newRestorableContext(true);
final ActionListener<ActionResponse> signingListener = new ContextPreservingActionListener<>(toRestore,
ActionListener.wrap(r -> {
try {
listener.onResponse(sign(r));
@ -122,7 +123,7 @@ public class SecurityActionFilter extends AbstractComponent implements ActionFil
}
});
} else {
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(true)) {
applyInternal(action, request, authenticatedListener);
}
}

View File

@ -92,14 +92,16 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
// which means that the user is copied over to system actions so we need to change the user
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext(), original, handler), sender));
new TransportService.ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender));
} else if (reservedRealmEnabled && connection.getVersion().before(Version.V_5_2_0_UNRELEASED) &&
KibanaUser.NAME.equals(securityContext.getUser().principal())) {
final User kibanaUser = securityContext.getUser();
final User bwcKibanaUser = new User(kibanaUser.principal(), new String[] { "kibana" }, kibanaUser.fullName(),
kibanaUser.email(), kibanaUser.metadata(), kibanaUser.enabled());
securityContext.executeAsUser(bwcKibanaUser, (original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext(), original, handler), sender));
new TransportService.ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original),
handler), sender));
} else {
sendWithUser(connection, action, request, options, handler, sender);
}
@ -212,7 +214,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
RequestContext.removeCurrent();
}
};
try (ThreadContext.StoredContext ctx = threadContext.newStoredContext()) {
try (ThreadContext.StoredContext ctx = threadContext.newStoredContext(true)) {
if (licenseState.isAuthAllowed()) {
String profile = channel.getProfileName();
ServerTransportFilter filter = profileFilters.get(profile);
@ -265,56 +267,4 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
throw new UnsupportedOperationException("task parameter is required for this operation");
}
}
/**
* This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the handle methods
* are invoked we restore the context.
*/
static final class ContextRestoreResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private final TransportResponseHandler<T> delegate;
private final ThreadContext.StoredContext context;
private final ThreadContext threadContext;
// pkg private for testing
ContextRestoreResponseHandler(ThreadContext threadContext, ThreadContext.StoredContext context,
TransportResponseHandler<T> delegate) {
this.delegate = delegate;
this.context = context;
this.threadContext = threadContext;
}
@Override
public T newInstance() {
return delegate.newInstance();
}
@Override
public void handleResponse(T response) {
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
context.restore();
delegate.handleResponse(response);
}
}
@Override
public void handleException(TransportException exp) {
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
context.restore();
delegate.handleException(exp);
}
}
@Override
public String executor() {
return delegate.executor();
}
@Override
public String toString() {
return getClass().getName() + "/" + delegate.toString();
}
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.authc.pki.PkiRealm;
import org.elasticsearch.xpack.security.authz.AuthorizationService;
import org.elasticsearch.xpack.security.authz.AuthorizationUtils;
import org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor.ContextRestoreResponseHandler;
import org.elasticsearch.xpack.security.user.KibanaUser;
import org.elasticsearch.xpack.security.user.User;

View File

@ -23,7 +23,7 @@ public class ContextPreservingActionListenerTests extends ESTestCase {
ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext, threadContext.newStoredContext(),
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
@ -57,7 +57,7 @@ public class ContextPreservingActionListenerTests extends ESTestCase {
ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext, threadContext.newStoredContext(),
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
@ -91,7 +91,7 @@ public class ContextPreservingActionListenerTests extends ESTestCase {
ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext, threadContext.newStoredContext(),
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.security.SecurityContext;
import org.elasticsearch.xpack.security.authc.Authentication;
@ -29,7 +30,6 @@ import org.elasticsearch.xpack.security.authc.Authentication.RealmRef;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.authz.AuthorizationService;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor.ContextRestoreResponseHandler;
import org.elasticsearch.xpack.security.user.KibanaUser;
import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
@ -254,8 +254,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
try (ThreadContext.StoredContext storedContext = threadContext.stashContext()) {
threadContext.putTransient("foo", "different_bar");
threadContext.putHeader("key", "value2");
TransportResponseHandler<Empty> handler = new ContextRestoreResponseHandler<>(threadContext, storedContext,
new TransportResponseHandler<Empty>() {
TransportResponseHandler<Empty> handler = new TransportService.ContextRestoreResponseHandler<>(
threadContext.wrapRestorable(storedContext), new TransportResponseHandler<Empty>() {
@Override
public Empty newInstance() {
@ -293,8 +293,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putTransient("foo", "different_bar");
threadContext.putHeader("key", "value2");
handler = new ContextRestoreResponseHandler<>(threadContext,
threadContext.newStoredContext(),
handler = new TransportService.ContextRestoreResponseHandler<>(threadContext.newRestorableContext(true),
new TransportResponseHandler<Empty>() {
@Override