Preserve `forceExecution` flag when forking off handler threads after authentication (elastic/elasticsearch#4706)
Today we might get a rejection on a critical operation where `forceExecution=true` but due to the fact that the forceExecution flag is not passed to the transport interceptor interface we were not able to preserve this flag when forking off the request after authentication. This causes serious issues if for instance a replication handler is rejected. Closes elastic/elasticsearch#4704 Original commit: elastic/x-pack-elasticsearch@f0aad7dede
This commit is contained in:
parent
9005e9fdb9
commit
b6703c1515
|
@ -704,9 +704,10 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin {
|
|||
return Collections.singletonList(new TransportInterceptor() {
|
||||
@Override
|
||||
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
|
||||
boolean forceExecution,
|
||||
TransportRequestHandler<T> actualHandler) {
|
||||
assert securityInterceptor.get() != null;
|
||||
return securityInterceptor.get().interceptHandler(action, executor, actualHandler);
|
||||
return securityInterceptor.get().interceptHandler(action, executor, forceExecution, actualHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
@ -43,7 +44,6 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.XPackSettings.TRANSPORT_SSL_ENABLED;
|
||||
import static org.elasticsearch.xpack.security.Security.setting;
|
||||
|
@ -129,8 +129,9 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
|
|||
|
||||
@Override
|
||||
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
|
||||
boolean forceExecution,
|
||||
TransportRequestHandler<T> actualHandler) {
|
||||
return new ProfileSecuredRequestHandler<>(action, executor, actualHandler, profileFilters,
|
||||
return new ProfileSecuredRequestHandler<>(action, forceExecution, executor, actualHandler, profileFilters,
|
||||
licenseState, threadPool);
|
||||
}
|
||||
|
||||
|
@ -179,10 +180,11 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
|
|||
private final ThreadContext threadContext;
|
||||
private final String executorName;
|
||||
private final ThreadPool threadPool;
|
||||
private final boolean forceExecution;
|
||||
|
||||
private ProfileSecuredRequestHandler(String action, String executorName, TransportRequestHandler<T> handler,
|
||||
Map<String,ServerTransportFilter> profileFilters, XPackLicenseState licenseState,
|
||||
ThreadPool threadPool) {
|
||||
ProfileSecuredRequestHandler(String action, boolean forceExecution, String executorName, TransportRequestHandler<T> handler,
|
||||
Map<String, ServerTransportFilter> profileFilters, XPackLicenseState licenseState,
|
||||
ThreadPool threadPool) {
|
||||
this.action = action;
|
||||
this.executorName = executorName;
|
||||
this.handler = handler;
|
||||
|
@ -190,30 +192,52 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
|
|||
this.licenseState = licenseState;
|
||||
this.threadContext = threadPool.getThreadContext();
|
||||
this.threadPool = threadPool;
|
||||
this.forceExecution = forceExecution;
|
||||
}
|
||||
|
||||
AbstractRunnable getReceiveRunnable(T request, TransportChannel channel, Task task) {
|
||||
return new AbstractRunnable() {
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return forceExecution;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
throw new UncheckedIOException(e1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
// FIXME we should remove the RequestContext completely since we have ThreadContext but cannot yet due to
|
||||
// the query cache
|
||||
RequestContext context = new RequestContext(request, threadContext);
|
||||
RequestContext.setCurrent(context);
|
||||
try {
|
||||
handler.messageReceived(request, channel, task);
|
||||
} finally {
|
||||
RequestContext.removeCurrent();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProfileSecuredRequestHandler{" +
|
||||
"action='" + action + '\'' +
|
||||
", executorName='" + executorName + '\'' +
|
||||
", forceExecution=" + forceExecution +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
|
||||
final Consumer<Exception> onFailure = (e) -> {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
throw new UncheckedIOException(e1);
|
||||
}
|
||||
};
|
||||
final Runnable receiveMessage = () -> {
|
||||
// FIXME we should remove the RequestContext completely since we have ThreadContext but cannot yet due to
|
||||
// the query cache
|
||||
RequestContext context = new RequestContext(request, threadContext);
|
||||
RequestContext.setCurrent(context);
|
||||
try {
|
||||
handler.messageReceived(request, channel, task);
|
||||
} catch (Exception e) {
|
||||
onFailure.accept(e);
|
||||
} finally {
|
||||
RequestContext.removeCurrent();
|
||||
}
|
||||
};
|
||||
final AbstractRunnable receiveMessage = getReceiveRunnable(request, channel, task);
|
||||
try (ThreadContext.StoredContext ctx = threadContext.newStoredContext(true)) {
|
||||
if (licenseState.isAuthAllowed()) {
|
||||
String profile = channel.getProfileName();
|
||||
|
@ -248,17 +272,15 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
|
|||
try {
|
||||
executor.execute(receiveMessage);
|
||||
} catch (Exception e) {
|
||||
onFailure.accept(e);
|
||||
receiveMessage.onFailure(e);
|
||||
}
|
||||
|
||||
};
|
||||
ActionListener<Void> filterListener = ActionListener.wrap(consumer, onFailure);
|
||||
ActionListener<Void> filterListener = ActionListener.wrap(consumer, receiveMessage::onFailure);
|
||||
filter.inbound(action, request, channel, filterListener);
|
||||
} else {
|
||||
receiveMessage.run();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
channel.sendResponse(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -38,12 +38,13 @@ public class SecurityServerTransportServiceTests extends SecurityIntegTestCase {
|
|||
public void testSecurityServerTransportServiceWrapsAllHandlers() {
|
||||
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
|
||||
for (Map.Entry<String, RequestHandlerRegistry> entry : transportService.requestHandlers.entrySet()) {
|
||||
assertThat(
|
||||
RequestHandlerRegistry handler = entry.getValue();
|
||||
assertEquals(
|
||||
"handler not wrapped by " + SecurityServerTransportInterceptor.ProfileSecuredRequestHandler.class +
|
||||
"; do all the handler registration methods have overrides?",
|
||||
entry.getValue().toString(),
|
||||
startsWith(SecurityServerTransportInterceptor.ProfileSecuredRequestHandler.class.getName() + "@")
|
||||
);
|
||||
handler.toString(),
|
||||
"ProfileSecuredRequestHandler{action='" + handler.getAction() + "', executorName='" + handler.getExecutor()
|
||||
+ "', forceExecution=" + handler.isForceExecution() + "}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue