From 237650e9c054149fd08213b38a81a3666c1868e5 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 25 Jun 2018 12:20:27 -0700 Subject: [PATCH] Add x-opaque-id to search slow logs (#31539) Add x-opaque-id to search slow logs only. Indexing slow log and audit logs will be handled as separate PRs. Relates #31521 --- .../org/elasticsearch/http/netty4/Netty4HttpClient.java | 3 ++- .../java/org/elasticsearch/http/nio/Netty4HttpClient.java | 5 +++-- .../main/java/org/elasticsearch/action/ActionModule.java | 3 ++- .../java/org/elasticsearch/http/DefaultRestChannel.java | 3 ++- .../main/java/org/elasticsearch/index/SearchSlowLog.java | 6 ++++++ server/src/main/java/org/elasticsearch/node/Node.java | 3 ++- server/src/main/java/org/elasticsearch/tasks/Task.java | 5 +++++ .../action/admin/cluster/node/tasks/TasksIT.java | 6 +++--- .../org/elasticsearch/http/DefaultRestChannelTests.java | 7 ++++--- .../java/org/elasticsearch/index/SearchSlowLogTests.java | 6 ++++++ 10 files changed, 35 insertions(+), 12 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java index 9719d15778b..0fa331ba138 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java @@ -44,6 +44,7 @@ import io.netty.handler.codec.http.HttpVersion; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.tasks.Task; import java.io.Closeable; import java.net.SocketAddress; @@ -74,7 +75,7 @@ class Netty4HttpClient implements Closeable { static Collection returnOpaqueIds(Collection responses) { List list = new ArrayList<>(responses.size()); for (HttpResponse response : responses) { - list.add(response.headers().get("X-Opaque-Id")); + list.add(response.headers().get(Task.X_OPAQUE_ID)); } return list; } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/Netty4HttpClient.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/Netty4HttpClient.java index 32f294f47ce..becebade373 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/Netty4HttpClient.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/Netty4HttpClient.java @@ -44,6 +44,7 @@ import io.netty.handler.codec.http.HttpVersion; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.tasks.Task; import java.io.Closeable; import java.net.SocketAddress; @@ -74,7 +75,7 @@ class Netty4HttpClient implements Closeable { static Collection returnOpaqueIds(Collection responses) { List list = new ArrayList<>(responses.size()); for (HttpResponse response : responses) { - list.add(response.headers().get("X-Opaque-Id")); + list.add(response.headers().get(Task.X_OPAQUE_ID)); } return list; } @@ -90,7 +91,7 @@ class Netty4HttpClient implements Closeable { for (int i = 0; i < uris.length; i++) { final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); httpRequest.headers().add(HOST, "localhost"); - httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); + httpRequest.headers().add(Task.X_OPAQUE_ID, String.valueOf(i)); requests.add(httpRequest); } return sendRequests(remoteAddress, requests); diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 324e75d64d8..48e1cef08d0 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -316,6 +316,7 @@ import org.elasticsearch.rest.action.search.RestExplainAction; import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.usage.UsageService; @@ -369,7 +370,7 @@ public class ActionModule extends AbstractModule { destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set headers = Stream.concat( actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()), - Stream.of("X-Opaque-Id") + Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); UnaryOperator restWrapper = null; for (ActionPlugin plugin : actionPlugins) { diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index 38bf1e751ef..9d21896182c 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -38,6 +38,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.elasticsearch.tasks.Task.X_OPAQUE_ID; + /** * The default rest channel for incoming requests. This class implements the basic logic for sending a rest * response. It will set necessary headers nad ensure that bytes are released after the response is sent. @@ -50,7 +52,6 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann static final String CONTENT_TYPE = "content-type"; static final String CONTENT_LENGTH = "content-length"; static final String SET_COOKIE = "set-cookie"; - static final String X_OPAQUE_ID = "X-Opaque-Id"; private final HttpRequest httpRequest; private final BigArrays bigArrays; diff --git a/server/src/main/java/org/elasticsearch/index/SearchSlowLog.java b/server/src/main/java/org/elasticsearch/index/SearchSlowLog.java index f3c5d07f1f2..10b4c4318a3 100644 --- a/server/src/main/java/org/elasticsearch/index/SearchSlowLog.java +++ b/server/src/main/java/org/elasticsearch/index/SearchSlowLog.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.tasks.Task; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -174,6 +175,11 @@ public final class SearchSlowLog implements SearchOperationListener { } else { sb.append("source[], "); } + if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) { + sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("], "); + } else { + sb.append("id[], "); + } return sb.toString(); } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 4440153dd36..64bc55edb71 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -136,6 +136,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.snapshots.SnapshotShardsService; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -447,7 +448,7 @@ public class Node implements Closeable { final Transport transport = networkModule.getTransportSupplier().get(); Set taskHeaders = Stream.concat( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), - Stream.of("X-Opaque-Id") + Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index 9fd9019cd21..f639846b418 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -34,6 +34,11 @@ import java.util.Map; */ public class Task { + /** + * The request header to mark tasks with specific ids + */ + public static final String X_OPAQUE_ID = "X-Opaque-Id"; + private final long id; private final String type; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 09a64a016ab..d33fff45308 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -357,7 +357,7 @@ public class TasksIT extends ESIntegTestCase { .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); Map headers = new HashMap<>(); - headers.put("X-Opaque-Id", "my_id"); + headers.put(Task.X_OPAQUE_ID, "my_id"); headers.put("Foo-Header", "bar"); headers.put("Custom-Task-Header", "my_value"); assertSearchResponse( @@ -404,7 +404,7 @@ public class TasksIT extends ESIntegTestCase { int maxSize = Math.toIntExact(SETTING_HTTP_MAX_HEADER_SIZE.getDefault(Settings.EMPTY).getBytes() / 2 + 1); Map headers = new HashMap<>(); - headers.put("X-Opaque-Id", "my_id"); + headers.put(Task.X_OPAQUE_ID, "my_id"); headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100)); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, @@ -415,7 +415,7 @@ public class TasksIT extends ESIntegTestCase { private void assertTaskHeaders(TaskInfo taskInfo) { assertThat(taskInfo.getHeaders().keySet(), hasSize(2)); - assertEquals("my_id", taskInfo.getHeaders().get("X-Opaque-Id")); + assertEquals("my_id", taskInfo.getHeaders().get(Task.X_OPAQUE_ID)); assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header")); } diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index bc499ed8a42..fd683761098 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -180,7 +181,7 @@ public class DefaultRestChannelTests extends ESTestCase { public void testHeadersSet() { Settings settings = Settings.builder().build(); final TestRequest httpRequest = new TestRequest(HttpRequest.HttpVersion.HTTP_1_1, RestRequest.Method.GET, "/"); - httpRequest.getHeaders().put(DefaultRestChannel.X_OPAQUE_ID, Collections.singletonList("abc")); + httpRequest.getHeaders().put(Task.X_OPAQUE_ID, Collections.singletonList("abc")); final RestRequest request = RestRequest.request(xContentRegistry(), httpRequest, httpChannel); HttpHandlingSettings handlingSettings = HttpHandlingSettings.fromSettings(settings); @@ -200,7 +201,7 @@ public class DefaultRestChannelTests extends ESTestCase { Map> headers = httpResponse.headers; assertNull(headers.get("non-existent-header")); assertEquals(customHeaderValue, headers.get(customHeader).get(0)); - assertEquals("abc", headers.get(DefaultRestChannel.X_OPAQUE_ID).get(0)); + assertEquals("abc", headers.get(Task.X_OPAQUE_ID).get(0)); assertEquals(Integer.toString(resp.content().length()), headers.get(DefaultRestChannel.CONTENT_LENGTH).get(0)); assertEquals(resp.contentType(), headers.get(DefaultRestChannel.CONTENT_TYPE).get(0)); } @@ -208,7 +209,7 @@ public class DefaultRestChannelTests extends ESTestCase { public void testCookiesSet() { Settings settings = Settings.builder().put(HttpTransportSettings.SETTING_HTTP_RESET_COOKIES.getKey(), true).build(); final TestRequest httpRequest = new TestRequest(HttpRequest.HttpVersion.HTTP_1_1, RestRequest.Method.GET, "/"); - httpRequest.getHeaders().put(DefaultRestChannel.X_OPAQUE_ID, Collections.singletonList("abc")); + httpRequest.getHeaders().put(Task.X_OPAQUE_ID, Collections.singletonList("abc")); final RestRequest request = RestRequest.request(xContentRegistry(), httpRequest, httpChannel); HttpHandlingSettings handlingSettings = HttpHandlingSettings.fromSettings(settings); diff --git a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java index 23d2f7bcafa..adb7a087367 100644 --- a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; @@ -34,12 +35,15 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Collections; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -166,10 +170,12 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase { SearchContext searchContext = createSearchContext(index); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); searchContext.request().source(source); + searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"))); SearchSlowLog.SlowLogSearchContextPrinter p = new SearchSlowLog.SlowLogSearchContextPrinter(searchContext, 10); assertThat(p.toString(), startsWith("[foo][0]")); // Makes sure that output doesn't contain any new lines assertThat(p.toString(), not(containsString("\n"))); + assertThat(p.toString(), endsWith("id[my_id], ")); } public void testLevelSetting() {