diff --git a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkAction.java b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkAction.java index 5f7836eafe5..0f7ad755c58 100644 --- a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkAction.java +++ b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.monitoring.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -70,11 +71,12 @@ public class TransportMonitoringBulkAction extends HandledTransportAction listener; private final Exporters exportService; @@ -82,8 +84,10 @@ public class TransportMonitoringBulkAction extends HandledTransportAction listener, Exporters exportService, + AsyncAction(ThreadPool threadPool, + MonitoringBulkRequest request, ActionListener listener, Exporters exportService, String defaultClusterUUID, long defaultTimestamp, MonitoringDoc.Node defaultNode) { + this.threadPool = threadPool; this.request = request; this.listener = listener; this.exportService = exportService; @@ -136,14 +140,24 @@ public class TransportMonitoringBulkAction extends HandledTransportAction docs, final long startTimeNanos, - final ActionListener listener) { - try { - exportService.export(docs, ActionListener.wrap( - r -> listener.onResponse(response(startTimeNanos)), - e -> listener.onResponse(response(startTimeNanos, e)))); - } catch (Exception e) { - listener.onResponse(response(startTimeNanos, e)); - } + final ActionListener delegate) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new ActionRunnable(delegate) { + @Override + protected void doRun() { + exportService.export( + docs, + ActionListener.wrap( + r -> listener.onResponse(response(startTimeNanos)), + this::onFailure + ) + ); + } + + @Override + public void onFailure(Exception e) { + listener.onResponse(response(startTimeNanos, e)); + } + }); } } diff --git a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java index 2bd25666ab8..f918c7aaf56 100644 --- a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java +++ b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.monitoring.action; +import java.util.concurrent.ExecutorService; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; @@ -24,7 +25,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskManager; @@ -82,13 +82,14 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { private TransportService transportService; private ActionFilters filters; private IndexNameExpressionResolver resolver; - private XPackLicenseState licenseState; - private TaskManager taskManager; private final MonitoringService monitoringService = mock(MonitoringService.class); @Before @SuppressWarnings("unchecked") public void setUpMocks() { + final ExecutorService executor = mock(ExecutorService.class); + final TaskManager taskManager = mock(TaskManager.class); + listener = mock(ActionListener.class); exporters = mock(Exporters.class); threadPool = mock(ThreadPool.class); @@ -96,12 +97,17 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { transportService = mock(TransportService.class); filters = mock(ActionFilters.class); resolver = mock(IndexNameExpressionResolver.class); - licenseState = mock(XPackLicenseState.class); - taskManager = mock(TaskManager.class); when(transportService.getTaskManager()).thenReturn(taskManager); when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(null); when(filters.filters()).thenReturn(new ActionFilter[0]); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executor); + + // execute in the same thread + doAnswer(invocation -> { + ((Runnable)invocation.getArguments()[0]).run(); + return null; + }).when(executor).execute(any(Runnable.class)); } public void testExecuteWithGlobalBlock() throws Exception { @@ -213,6 +219,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { monitoringService); action.execute(request).get(); + verify(threadPool).executor(ThreadPool.Names.GENERIC); verify(exporters).export(any(Collection.class), any(ActionListener.class)); verify(clusterService, times(2)).state(); verify(clusterService).localNode(); @@ -230,7 +237,8 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { } final Collection results = - new TransportMonitoringBulkAction.AsyncAction(null, null, null, null, 0L, null).createMonitoringDocs(bulkDocs); + new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, null, 0L, null) + .createMonitoringDocs(bulkDocs); assertThat(results, notNullValue()); assertThat(results.size(), equalTo(0)); @@ -262,7 +270,8 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { } final Collection exportedDocs = - new TransportMonitoringBulkAction.AsyncAction(null, null, null, "_cluster", 123L, node).createMonitoringDocs(docs); + new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, "_cluster", 123L, node) + .createMonitoringDocs(docs); assertThat(exportedDocs, notNullValue()); assertThat(exportedDocs.size(), equalTo(nbDocs)); @@ -280,9 +289,10 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { public void testAsyncActionCreateMonitoringDocWithNoTimestamp() { final MonitoringBulkDoc monitoringBulkDoc = new MonitoringBulkDoc(MonitoredSystem.LOGSTASH, "_type", "_id", 0L, 0L, BytesArray.EMPTY, XContentType.JSON); - final MonitoringDoc monitoringDoc = - new TransportMonitoringBulkAction.AsyncAction(null, null, null, "", 456L, null).createMonitoringDoc(monitoringBulkDoc); + new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, "", 456L, null) + .createMonitoringDoc(monitoringBulkDoc); + assertThat(monitoringDoc.getTimestamp(), equalTo(456L)); } @@ -301,7 +311,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { new MonitoringBulkDoc(MonitoredSystem.LOGSTASH, "_type", "_id", 1502107402133L, 15_000L, source, xContentType); final MonitoringDoc monitoringDoc = - new TransportMonitoringBulkAction.AsyncAction(null, null, null, "_cluster_uuid", 3L, node) + new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, "_cluster_uuid", 3L, node) .createMonitoringDoc(monitoringBulkDoc); final BytesReference xContent = XContentHelper.toXContent(monitoringDoc, XContentType.JSON, randomBoolean()); @@ -344,10 +354,11 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { }).when(exporters).export(any(Collection.class), any(ActionListener.class)); final TransportMonitoringBulkAction.AsyncAction asyncAction = - new TransportMonitoringBulkAction.AsyncAction(null, null, exporters, null, 0L, null); + new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, exporters, null, 0L, null); asyncAction.executeExport(docs, randomNonNegativeLong(), listener); + verify(threadPool).executor(ThreadPool.Names.GENERIC); verify(exporters).export(eq(docs), any(ActionListener.class)); } @@ -364,10 +375,12 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { .export(any(Collection.class), any(ActionListener.class)); final TransportMonitoringBulkAction.AsyncAction asyncAction = - new TransportMonitoringBulkAction.AsyncAction(null, null, exporters, null, 0L, null); + new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, exporters, null, 0L, null); asyncAction.executeExport(docs, randomNonNegativeLong(), listener); + verify(threadPool).executor(ThreadPool.Names.GENERIC); + final ArgumentCaptor> argDocs = ArgumentCaptor.forClass((Class) Collection.class); verify(exporters).export(argDocs.capture(), any(ActionListener.class)); assertThat(argDocs.getValue(), is(docs));