[Monitoring] Thread _xpack/monitoring/_bulk (elastic/x-pack-elasticsearch#4393)

Instead of allowing the `_xpack/monitoring/_bulk` to remain on the same
thread, it should execute on a separate thread to avoid blocking the
http worker thread whenever the exporters get stuck waiting on the
monitoring cluster.

Original commit: elastic/x-pack-elasticsearch@25ce9a4df0
This commit is contained in:
Chris Earle 2018-04-17 11:16:49 -04:00 committed by GitHub
parent f1902aba39
commit 51d87994ca
2 changed files with 49 additions and 22 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.monitoring.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -70,11 +71,12 @@ public class TransportMonitoringBulkAction extends HandledTransportAction<Monito
discoveryNode.getHostAddress(),
discoveryNode.getName(), timestamp);
new AsyncAction(request, listener, exportService, cluster, timestamp, node).start();
new AsyncAction(threadPool, request, listener, exportService, cluster, timestamp, node).start();
}
static class AsyncAction {
private final ThreadPool threadPool;
private final MonitoringBulkRequest request;
private final ActionListener<MonitoringBulkResponse> listener;
private final Exporters exportService;
@ -82,8 +84,10 @@ public class TransportMonitoringBulkAction extends HandledTransportAction<Monito
private final long defaultTimestamp;
private final MonitoringDoc.Node defaultNode;
AsyncAction(MonitoringBulkRequest request, ActionListener<MonitoringBulkResponse> listener, Exporters exportService,
AsyncAction(ThreadPool threadPool,
MonitoringBulkRequest request, ActionListener<MonitoringBulkResponse> listener, Exporters exportService,
String defaultClusterUUID, long defaultTimestamp, MonitoringDoc.Node defaultNode) {
this.threadPool = threadPool;
this.request = request;
this.listener = listener;
this.exportService = exportService;
@ -136,14 +140,24 @@ public class TransportMonitoringBulkAction extends HandledTransportAction<Monito
* Exports the documents
*/
void executeExport(final Collection<MonitoringDoc> docs, final long startTimeNanos,
final ActionListener<MonitoringBulkResponse> listener) {
try {
exportService.export(docs, ActionListener.wrap(
r -> listener.onResponse(response(startTimeNanos)),
e -> listener.onResponse(response(startTimeNanos, e))));
} catch (Exception e) {
listener.onResponse(response(startTimeNanos, e));
}
final ActionListener<MonitoringBulkResponse> delegate) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new ActionRunnable<MonitoringBulkResponse>(delegate) {
@Override
protected void doRun() {
exportService.export(
docs,
ActionListener.wrap(
r -> listener.onResponse(response(startTimeNanos)),
this::onFailure
)
);
}
@Override
public void onFailure(Exception e) {
listener.onResponse(response(startTimeNanos, e));
}
});
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.monitoring.action;
import java.util.concurrent.ExecutorService;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
@ -24,7 +25,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
@ -82,13 +82,14 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
private TransportService transportService;
private ActionFilters filters;
private IndexNameExpressionResolver resolver;
private XPackLicenseState licenseState;
private TaskManager taskManager;
private final MonitoringService monitoringService = mock(MonitoringService.class);
@Before
@SuppressWarnings("unchecked")
public void setUpMocks() {
final ExecutorService executor = mock(ExecutorService.class);
final TaskManager taskManager = mock(TaskManager.class);
listener = mock(ActionListener.class);
exporters = mock(Exporters.class);
threadPool = mock(ThreadPool.class);
@ -96,12 +97,17 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
transportService = mock(TransportService.class);
filters = mock(ActionFilters.class);
resolver = mock(IndexNameExpressionResolver.class);
licenseState = mock(XPackLicenseState.class);
taskManager = mock(TaskManager.class);
when(transportService.getTaskManager()).thenReturn(taskManager);
when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(null);
when(filters.filters()).thenReturn(new ActionFilter[0]);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executor);
// execute in the same thread
doAnswer(invocation -> {
((Runnable)invocation.getArguments()[0]).run();
return null;
}).when(executor).execute(any(Runnable.class));
}
public void testExecuteWithGlobalBlock() throws Exception {
@ -213,6 +219,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
monitoringService);
action.execute(request).get();
verify(threadPool).executor(ThreadPool.Names.GENERIC);
verify(exporters).export(any(Collection.class), any(ActionListener.class));
verify(clusterService, times(2)).state();
verify(clusterService).localNode();
@ -230,7 +237,8 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
}
final Collection<MonitoringDoc> results =
new TransportMonitoringBulkAction.AsyncAction(null, null, null, null, 0L, null).createMonitoringDocs(bulkDocs);
new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, null, 0L, null)
.createMonitoringDocs(bulkDocs);
assertThat(results, notNullValue());
assertThat(results.size(), equalTo(0));
@ -262,7 +270,8 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
}
final Collection<MonitoringDoc> exportedDocs =
new TransportMonitoringBulkAction.AsyncAction(null, null, null, "_cluster", 123L, node).createMonitoringDocs(docs);
new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, "_cluster", 123L, node)
.createMonitoringDocs(docs);
assertThat(exportedDocs, notNullValue());
assertThat(exportedDocs.size(), equalTo(nbDocs));
@ -280,9 +289,10 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
public void testAsyncActionCreateMonitoringDocWithNoTimestamp() {
final MonitoringBulkDoc monitoringBulkDoc =
new MonitoringBulkDoc(MonitoredSystem.LOGSTASH, "_type", "_id", 0L, 0L, BytesArray.EMPTY, XContentType.JSON);
final MonitoringDoc monitoringDoc =
new TransportMonitoringBulkAction.AsyncAction(null, null, null, "", 456L, null).createMonitoringDoc(monitoringBulkDoc);
new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, "", 456L, null)
.createMonitoringDoc(monitoringBulkDoc);
assertThat(monitoringDoc.getTimestamp(), equalTo(456L));
}
@ -301,7 +311,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
new MonitoringBulkDoc(MonitoredSystem.LOGSTASH, "_type", "_id", 1502107402133L, 15_000L, source, xContentType);
final MonitoringDoc monitoringDoc =
new TransportMonitoringBulkAction.AsyncAction(null, null, null, "_cluster_uuid", 3L, node)
new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, null, "_cluster_uuid", 3L, node)
.createMonitoringDoc(monitoringBulkDoc);
final BytesReference xContent = XContentHelper.toXContent(monitoringDoc, XContentType.JSON, randomBoolean());
@ -344,10 +354,11 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
}).when(exporters).export(any(Collection.class), any(ActionListener.class));
final TransportMonitoringBulkAction.AsyncAction asyncAction =
new TransportMonitoringBulkAction.AsyncAction(null, null, exporters, null, 0L, null);
new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, exporters, null, 0L, null);
asyncAction.executeExport(docs, randomNonNegativeLong(), listener);
verify(threadPool).executor(ThreadPool.Names.GENERIC);
verify(exporters).export(eq(docs), any(ActionListener.class));
}
@ -364,10 +375,12 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
.export(any(Collection.class), any(ActionListener.class));
final TransportMonitoringBulkAction.AsyncAction asyncAction =
new TransportMonitoringBulkAction.AsyncAction(null, null, exporters, null, 0L, null);
new TransportMonitoringBulkAction.AsyncAction(threadPool, null, null, exporters, null, 0L, null);
asyncAction.executeExport(docs, randomNonNegativeLong(), listener);
verify(threadPool).executor(ThreadPool.Names.GENERIC);
final ArgumentCaptor<Collection<MonitoringDoc>> argDocs = ArgumentCaptor.forClass((Class) Collection.class);
verify(exporters).export(argDocs.capture(), any(ActionListener.class));
assertThat(argDocs.getValue(), is(docs));