Move ContextPreservingActionListener to core (elastic/elasticsearch#4692)

Original commit: elastic/x-pack-elasticsearch@c3e5762ffc
This commit is contained in:
Boaz Leskes 2017-01-20 10:14:34 +01:00 committed by GitHub
parent 6ed83cc8ea
commit 630b5fd836
5 changed files with 12 additions and 180 deletions

View File

@ -1,40 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import java.util.function.Supplier;
/**
* Restores the given {@link org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext}
* once the listener is invoked
*/
public final class ContextPreservingActionListener<R> implements ActionListener<R> {
private final ActionListener<R> delegate;
private final Supplier<ThreadContext.StoredContext> context;
public ContextPreservingActionListener(Supplier<ThreadContext.StoredContext> contextSupplier, ActionListener<R> delegate) {
this.delegate = delegate;
this.context = contextSupplier;
}
@Override
public void onResponse(R r) {
try (ThreadContext.StoredContext ignore = context.get()) {
delegate.onResponse(r);
}
}
@Override
public void onFailure(Exception e) {
try (ThreadContext.StoredContext ignore = context.get()) {
delegate.onFailure(e);
}
}
}

View File

@ -5,15 +5,6 @@
*/
package org.elasticsearch.xpack.security;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -24,6 +15,7 @@ import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.common.settings.Settings;
@ -33,12 +25,20 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.ContextPreservingActionListener;
import org.elasticsearch.xpack.security.authc.Authentication;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.security.user.XPackUser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A special filter client for internal node communication which adds the internal xpack user to the headers.
* An optionally secured client for internal node communication.

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -27,7 +28,6 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.common.ContextPreservingActionListener;
import org.elasticsearch.xpack.security.SecurityContext;
import org.elasticsearch.xpack.security.action.SecurityActionMapper;
import org.elasticsearch.xpack.security.action.interceptor.RequestInterceptor;

View File

@ -1,128 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
public class ContextPreservingActionListenerTests extends ESTestCase {
public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
final boolean nonEmptyContext = randomBoolean();
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
}
ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
}
@Override
public void onFailure(Exception e) {
throw new RuntimeException("onFailure shouldn't be called", e);
}
});
}
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
actionListener.onResponse(null);
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
}
}
public void testOriginalContextIsPreservedAfterOnFailure() throws Exception {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
final boolean nonEmptyContext = randomBoolean();
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
}
ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
throw new RuntimeException("onResponse shouldn't be called");
}
@Override
public void onFailure(Exception e) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
}
});
}
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
actionListener.onFailure(null);
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
}
}
public void testOriginalContextIsWhenListenerThrows() throws Exception {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
final boolean nonEmptyContext = randomBoolean();
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
}
ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
throw new RuntimeException("onResponse called");
}
@Override
public void onFailure(Exception e) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
throw new RuntimeException("onFailure called");
}
});
}
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
RuntimeException e = expectThrows(RuntimeException.class, () -> actionListener.onResponse(null));
assertEquals("onResponse called", e.getMessage());
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
e = expectThrows(RuntimeException.class, () -> actionListener.onFailure(null));
assertEquals("onFailure called", e.getMessage());
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
}
}
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.settings.ClusterSettings;
@ -24,7 +25,6 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.ContextPreservingActionListener;
import org.elasticsearch.xpack.security.SecurityContext;
import org.elasticsearch.xpack.security.audit.AuditTrailService;
import org.elasticsearch.xpack.security.authc.Authentication;