From 852aac0b9c1edf83725ed9243a59fa242d1c35be Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 5 Jan 2016 10:07:54 -0500 Subject: [PATCH] Add support for the task management framework These changes are required to support elastic/elasticsearchelastic/elasticsearch#15347 Original commit: elastic/x-pack-elasticsearch@37adf4fc83daa080b527b8f743f1ba31ef62f2d0 --- .../shield/action/ShieldActionFilter.java | 7 +++--- .../transport/ServerTransportFilter.java | 10 +++++++-- .../ShieldServerTransportService.java | 10 +++++++-- .../action/ShieldActionFilterTests.java | 22 ++++++++++++------- .../transport/TransportFilterTests.java | 7 +++--- .../ShieldServerTransportServiceTests.java | 5 +++-- .../org/elasticsearch/transport/actions | 1 + .../org/elasticsearch/transport/handlers | 1 + .../actions/WatcherTransportAction.java | 5 +++-- 9 files changed, 45 insertions(+), 23 deletions(-) diff --git a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/action/ShieldActionFilter.java b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/action/ShieldActionFilter.java index c5fbdbcca2f..4e221a98cf3 100644 --- a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/action/ShieldActionFilter.java +++ b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/action/ShieldActionFilter.java @@ -26,6 +26,7 @@ import org.elasticsearch.shield.authz.AuthorizationService; import org.elasticsearch.shield.authz.Privilege; import org.elasticsearch.shield.crypto.CryptoService; import org.elasticsearch.shield.license.ShieldLicenseState; +import org.elasticsearch.tasks.Task; import java.io.IOException; import java.util.ArrayList; @@ -64,7 +65,7 @@ public class ShieldActionFilter extends AbstractComponent implements ActionFilte } @Override - public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { + public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { /** A functional requirement - when the license of shield is disabled (invalid/expires), shield will continue @@ -100,9 +101,9 @@ public class ShieldActionFilter extends AbstractComponent implements ActionFilte interceptor.intercept(request, user); } } - chain.proceed(action, request, new SigningListener(this, listener)); + chain.proceed(task, action, request, new SigningListener(this, listener)); } else { - chain.proceed(action, request, listener); + chain.proceed(task, action, request, listener); } } catch (Throwable t) { listener.onFailure(t); diff --git a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ServerTransportFilter.java b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ServerTransportFilter.java index b349dca1fc4..24e79cbde0a 100644 --- a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ServerTransportFilter.java +++ b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ServerTransportFilter.java @@ -12,6 +12,7 @@ import org.elasticsearch.shield.action.ShieldActionMapper; import org.elasticsearch.shield.authc.AuthenticationService; import org.elasticsearch.shield.authc.pki.PkiRealm; import org.elasticsearch.shield.authz.AuthorizationService; +import org.elasticsearch.transport.DelegatingTransportChannel; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.netty.NettyTransportChannel; @@ -71,8 +72,13 @@ public interface ServerTransportFilter { */ String shieldAction = actionMapper.action(action, request); - if (extractClientCert && (transportChannel instanceof NettyTransportChannel)) { - Channel channel = ((NettyTransportChannel)transportChannel).getChannel(); + TransportChannel unwrappedChannel = transportChannel; + while (unwrappedChannel instanceof DelegatingTransportChannel) { + unwrappedChannel = ((DelegatingTransportChannel) unwrappedChannel).getChannel(); + } + + if (extractClientCert && (unwrappedChannel instanceof NettyTransportChannel)) { + Channel channel = ((NettyTransportChannel)unwrappedChannel).getChannel(); SslHandler sslHandler = channel.getPipeline().get(SslHandler.class); assert sslHandler != null; diff --git a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java index 6101353e65e..8dc3ef6089e 100644 --- a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java +++ b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java @@ -14,6 +14,7 @@ import org.elasticsearch.shield.authz.AuthorizationService; import org.elasticsearch.shield.authz.accesscontrol.RequestContext; import org.elasticsearch.shield.license.ShieldLicenseState; import org.elasticsearch.shield.transport.netty.ShieldNettyTransport; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; @@ -144,7 +145,7 @@ public class ShieldServerTransportService extends TransportService { @Override @SuppressWarnings("unchecked") - public void messageReceived(T request, TransportChannel channel) throws Exception { + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { try { if (licenseState.securityEnabled()) { String profile = channel.getProfileName(); @@ -163,13 +164,18 @@ public class ShieldServerTransportService extends TransportService { } RequestContext context = new RequestContext(request); RequestContext.setCurrent(context); - handler.messageReceived(request, channel); + handler.messageReceived(request, channel, task); } catch (Throwable t) { channel.sendResponse(t); } finally { RequestContext.removeCurrent(); } } + + @Override + public void messageReceived(T request, TransportChannel channel) throws Exception { + throw new UnsupportedOperationException("task parameter is required for this operation"); + } } } diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/action/ShieldActionFilterTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/action/ShieldActionFilterTests.java index 8dc60204161..6d62cfd501f 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/action/ShieldActionFilterTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/action/ShieldActionFilterTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.shield.authc.AuthenticationService; import org.elasticsearch.shield.authz.AuthorizationService; import org.elasticsearch.shield.crypto.CryptoService; import org.elasticsearch.shield.license.ShieldLicenseState; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -62,12 +63,13 @@ public class ShieldActionFilterTests extends ESTestCase { ActionRequest request = mock(ActionRequest.class); ActionListener listener = mock(ActionListener.class); ActionFilterChain chain = mock(ActionFilterChain.class); + Task task = mock(Task.class); User user = new User.Simple("username", new String[] { "r1", "r2" }); when(authcService.authenticate("_action", request, User.SYSTEM)).thenReturn(user); doReturn(request).when(spy(filter)).unsign(user, "_action", request); - filter.apply("_action", request, listener, chain); + filter.apply(task, "_action", request, listener, chain); verify(authzService).authorize(user, "_action", request); - verify(chain).proceed(eq("_action"), eq(request), isA(ShieldActionFilter.SigningListener.class)); + verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(ShieldActionFilter.SigningListener.class)); } public void testActionProcessException() throws Exception { @@ -75,10 +77,11 @@ public class ShieldActionFilterTests extends ESTestCase { ActionListener listener = mock(ActionListener.class); ActionFilterChain chain = mock(ActionFilterChain.class); RuntimeException exception = new RuntimeException("process-error"); + Task task = mock(Task.class); User user = new User.Simple("username", new String[] { "r1", "r2" }); when(authcService.authenticate("_action", request, User.SYSTEM)).thenReturn(user); doThrow(exception).when(authzService).authorize(user, "_action", request); - filter.apply("_action", request, listener, chain); + filter.apply(task, "_action", request, listener, chain); verify(listener).onFailure(exception); verifyNoMoreInteractions(chain); } @@ -88,13 +91,14 @@ public class ShieldActionFilterTests extends ESTestCase { ActionListener listener = mock(ActionListener.class); ActionFilterChain chain = mock(ActionFilterChain.class); User user = mock(User.class); + Task task = mock(Task.class); when(authcService.authenticate("_action", request, User.SYSTEM)).thenReturn(user); when(cryptoService.signed("signed_scroll_id")).thenReturn(true); when(cryptoService.unsignAndVerify("signed_scroll_id")).thenReturn("scroll_id"); - filter.apply("_action", request, listener, chain); + filter.apply(task, "_action", request, listener, chain); assertThat(request.scrollId(), equalTo("scroll_id")); verify(authzService).authorize(user, "_action", request); - verify(chain).proceed(eq("_action"), eq(request), isA(ShieldActionFilter.SigningListener.class)); + verify(chain).proceed(eq(task), eq("_action"), eq(request), isA(ShieldActionFilter.SigningListener.class)); } public void testActionSignatureError() throws Exception { @@ -103,10 +107,11 @@ public class ShieldActionFilterTests extends ESTestCase { ActionFilterChain chain = mock(ActionFilterChain.class); IllegalArgumentException sigException = new IllegalArgumentException("bad bad boy"); User user = mock(User.class); + Task task = mock(Task.class); when(authcService.authenticate("_action", request, User.SYSTEM)).thenReturn(user); when(cryptoService.signed("scroll_id")).thenReturn(true); doThrow(sigException).when(cryptoService).unsignAndVerify("scroll_id"); - filter.apply("_action", request, listener, chain); + filter.apply(task, "_action", request, listener, chain); verify(listener).onFailure(isA(ElasticsearchSecurityException.class)); verify(auditTrail).tamperedRequest(user, "_action", request); verifyNoMoreInteractions(chain); @@ -116,11 +121,12 @@ public class ShieldActionFilterTests extends ESTestCase { ActionRequest request = mock(ActionRequest.class); ActionListener listener = mock(ActionListener.class); ActionFilterChain chain = mock(ActionFilterChain.class); + Task task = mock(Task.class); when(shieldLicenseState.securityEnabled()).thenReturn(false); - filter.apply("_action", request, listener, chain); + filter.apply(task, "_action", request, listener, chain); verifyZeroInteractions(authcService); verifyZeroInteractions(authzService); - verify(chain).proceed(eq("_action"), eq(request), eq(listener)); + verify(chain).proceed(eq(task), eq("_action"), eq(request), eq(listener)); } } diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/transport/TransportFilterTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/transport/TransportFilterTests.java index 734503e168e..1e24fd9bcba 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/transport/TransportFilterTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/transport/TransportFilterTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; -import org.elasticsearch.transport.netty.NettyTransportChannel; import org.mockito.InOrder; import java.io.IOException; @@ -102,11 +101,11 @@ public class TransportFilterTests extends ESIntegTestCase { ClientTransportFilter sourceClientFilter = internalCluster().getInstance(ClientTransportFilter.class, source); ClientTransportFilter targetClientFilter = internalCluster().getInstance(ClientTransportFilter.class, target); - InOrder inOrder = inOrder(sourceServerFilter, sourceClientFilter, targetServerFilter, targetClientFilter); + InOrder inOrder = inOrder(sourceClientFilter, targetServerFilter, targetClientFilter, sourceServerFilter); inOrder.verify(sourceClientFilter).outbound("_action", new Request("src_to_trgt")); - inOrder.verify(targetServerFilter).inbound(eq("_action"), eq(new Request("src_to_trgt")), isA(NettyTransportChannel.class)); + inOrder.verify(targetServerFilter).inbound(eq("_action"), eq(new Request("src_to_trgt")), isA(TransportChannel.class)); inOrder.verify(targetClientFilter).outbound("_action", new Request("trgt_to_src")); - inOrder.verify(sourceServerFilter).inbound(eq("_action"), eq(new Request("trgt_to_src")), isA(NettyTransportChannel.class)); + inOrder.verify(sourceServerFilter).inbound(eq("_action"), eq(new Request("trgt_to_src")), isA(TransportChannel.class)); } public static class InternalPlugin extends Plugin { diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/ShieldServerTransportServiceTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/ShieldServerTransportServiceTests.java index df404496bb4..527f21186fa 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/ShieldServerTransportServiceTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/ShieldServerTransportServiceTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.test.ShieldIntegTestCase; import java.util.Map; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; // this class sits in org.elasticsearch.transport so that TransportService.requestHandlers is visible public class ShieldServerTransportServiceTests extends ShieldIntegTestCase { @@ -30,8 +31,8 @@ public class ShieldServerTransportServiceTests extends ShieldIntegTestCase { for (Map.Entry entry : transportService.requestHandlers.entrySet()) { assertThat( "handler not wrapped by " + ShieldServerTransportService.ProfileSecuredRequestHandler.class + "; do all the handler registration methods have overrides?", - entry.getValue().getHandler(), - instanceOf(ShieldServerTransportService.ProfileSecuredRequestHandler.class) + entry.getValue().toString(), + startsWith(ShieldServerTransportService.ProfileSecuredRequestHandler.class.getName() + "@") ); } } diff --git a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions index 6c214da44cc..bbfb993fcf0 100644 --- a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions +++ b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions @@ -17,6 +17,7 @@ cluster:monitor/nodes/stats cluster:monitor/state cluster:monitor/stats cluster:monitor/task +cluster:monitor/tasks/lists indices:admin/aliases indices:admin/aliases/exists indices:admin/aliases/get diff --git a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers index bf8a2982b59..95c82666c58 100644 --- a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers +++ b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers @@ -6,6 +6,7 @@ cluster:monitor/nodes/info[n] cluster:monitor/nodes/liveness cluster:monitor/nodes/stats[n] cluster:monitor/stats[n] +cluster:monitor/tasks/lists[n] cluster:admin/shield/realm/cache/clear cluster:admin/shield/realm/cache/clear[n] indices:admin/analyze[s] diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/WatcherTransportAction.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/WatcherTransportAction.java index f974bb75cd2..a50ea22fa50 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/WatcherTransportAction.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/watcher/transport/actions/WatcherTransportAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.plugin.core.LicenseUtils; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.license.WatcherLicensee; @@ -35,9 +36,9 @@ public abstract class WatcherTransportAction listener) { + protected void doExecute(Task task, Request request, ActionListener listener) { if (watcherLicensee.isWatcherTransportActionAllowed()) { - super.doExecute(request, listener); + super.doExecute(task, request, listener); } else { listener.onFailure(LicenseUtils.newComplianceException(WatcherLicensee.ID)); }