Add x-opaque-id to search slow logs (#31539)
Add x-opaque-id to search slow logs only. Indexing slow log and audit logs will be handled as separate PRs. Relates #31521
This commit is contained in:
parent
bb1d4aaf17
commit
237650e9c0
|
@ -44,6 +44,7 @@ import io.netty.handler.codec.http.HttpVersion;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.net.SocketAddress;
|
||||
|
@ -74,7 +75,7 @@ class Netty4HttpClient implements Closeable {
|
|||
static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses) {
|
||||
List<String> list = new ArrayList<>(responses.size());
|
||||
for (HttpResponse response : responses) {
|
||||
list.add(response.headers().get("X-Opaque-Id"));
|
||||
list.add(response.headers().get(Task.X_OPAQUE_ID));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ import io.netty.handler.codec.http.HttpVersion;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.net.SocketAddress;
|
||||
|
@ -74,7 +75,7 @@ class Netty4HttpClient implements Closeable {
|
|||
static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses) {
|
||||
List<String> list = new ArrayList<>(responses.size());
|
||||
for (HttpResponse response : responses) {
|
||||
list.add(response.headers().get("X-Opaque-Id"));
|
||||
list.add(response.headers().get(Task.X_OPAQUE_ID));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
@ -90,7 +91,7 @@ class Netty4HttpClient implements Closeable {
|
|||
for (int i = 0; i < uris.length; i++) {
|
||||
final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
|
||||
httpRequest.headers().add(HOST, "localhost");
|
||||
httpRequest.headers().add("X-Opaque-ID", String.valueOf(i));
|
||||
httpRequest.headers().add(Task.X_OPAQUE_ID, String.valueOf(i));
|
||||
requests.add(httpRequest);
|
||||
}
|
||||
return sendRequests(remoteAddress, requests);
|
||||
|
|
|
@ -316,6 +316,7 @@ import org.elasticsearch.rest.action.search.RestExplainAction;
|
|||
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
|
||||
import org.elasticsearch.rest.action.search.RestSearchAction;
|
||||
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.usage.UsageService;
|
||||
|
||||
|
@ -369,7 +370,7 @@ public class ActionModule extends AbstractModule {
|
|||
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
|
||||
Set<String> headers = Stream.concat(
|
||||
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
|
||||
Stream.of("X-Opaque-Id")
|
||||
Stream.of(Task.X_OPAQUE_ID)
|
||||
).collect(Collectors.toSet());
|
||||
UnaryOperator<RestHandler> restWrapper = null;
|
||||
for (ActionPlugin plugin : actionPlugins) {
|
||||
|
|
|
@ -38,6 +38,8 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.tasks.Task.X_OPAQUE_ID;
|
||||
|
||||
/**
|
||||
* The default rest channel for incoming requests. This class implements the basic logic for sending a rest
|
||||
* response. It will set necessary headers nad ensure that bytes are released after the response is sent.
|
||||
|
@ -50,7 +52,6 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
|
|||
static final String CONTENT_TYPE = "content-type";
|
||||
static final String CONTENT_LENGTH = "content-length";
|
||||
static final String SET_COOKIE = "set-cookie";
|
||||
static final String X_OPAQUE_ID = "X-Opaque-Id";
|
||||
|
||||
private final HttpRequest httpRequest;
|
||||
private final BigArrays bigArrays;
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.index.shard.SearchOperationListener;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -174,6 +175,11 @@ public final class SearchSlowLog implements SearchOperationListener {
|
|||
} else {
|
||||
sb.append("source[], ");
|
||||
}
|
||||
if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) {
|
||||
sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("], ");
|
||||
} else {
|
||||
sb.append("id[], ");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -136,6 +136,7 @@ import org.elasticsearch.search.SearchService;
|
|||
import org.elasticsearch.search.fetch.FetchPhase;
|
||||
import org.elasticsearch.snapshots.SnapshotShardsService;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskResultsService;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -447,7 +448,7 @@ public class Node implements Closeable {
|
|||
final Transport transport = networkModule.getTransportSupplier().get();
|
||||
Set<String> taskHeaders = Stream.concat(
|
||||
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
|
||||
Stream.of("X-Opaque-Id")
|
||||
Stream.of(Task.X_OPAQUE_ID)
|
||||
).collect(Collectors.toSet());
|
||||
final TransportService transportService = newTransportService(settings, transport, threadPool,
|
||||
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
|
||||
|
|
|
@ -34,6 +34,11 @@ import java.util.Map;
|
|||
*/
|
||||
public class Task {
|
||||
|
||||
/**
|
||||
* The request header to mark tasks with specific ids
|
||||
*/
|
||||
public static final String X_OPAQUE_ID = "X-Opaque-Id";
|
||||
|
||||
private final long id;
|
||||
|
||||
private final String type;
|
||||
|
|
|
@ -357,7 +357,7 @@ public class TasksIT extends ESIntegTestCase {
|
|||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
|
||||
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put("X-Opaque-Id", "my_id");
|
||||
headers.put(Task.X_OPAQUE_ID, "my_id");
|
||||
headers.put("Foo-Header", "bar");
|
||||
headers.put("Custom-Task-Header", "my_value");
|
||||
assertSearchResponse(
|
||||
|
@ -404,7 +404,7 @@ public class TasksIT extends ESIntegTestCase {
|
|||
int maxSize = Math.toIntExact(SETTING_HTTP_MAX_HEADER_SIZE.getDefault(Settings.EMPTY).getBytes() / 2 + 1);
|
||||
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put("X-Opaque-Id", "my_id");
|
||||
headers.put(Task.X_OPAQUE_ID, "my_id");
|
||||
headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100));
|
||||
IllegalArgumentException ex = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
|
@ -415,7 +415,7 @@ public class TasksIT extends ESIntegTestCase {
|
|||
|
||||
private void assertTaskHeaders(TaskInfo taskInfo) {
|
||||
assertThat(taskInfo.getHeaders().keySet(), hasSize(2));
|
||||
assertEquals("my_id", taskInfo.getHeaders().get("X-Opaque-Id"));
|
||||
assertEquals("my_id", taskInfo.getHeaders().get(Task.X_OPAQUE_ID));
|
||||
assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header"));
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.rest.RestChannel;
|
|||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -180,7 +181,7 @@ public class DefaultRestChannelTests extends ESTestCase {
|
|||
public void testHeadersSet() {
|
||||
Settings settings = Settings.builder().build();
|
||||
final TestRequest httpRequest = new TestRequest(HttpRequest.HttpVersion.HTTP_1_1, RestRequest.Method.GET, "/");
|
||||
httpRequest.getHeaders().put(DefaultRestChannel.X_OPAQUE_ID, Collections.singletonList("abc"));
|
||||
httpRequest.getHeaders().put(Task.X_OPAQUE_ID, Collections.singletonList("abc"));
|
||||
final RestRequest request = RestRequest.request(xContentRegistry(), httpRequest, httpChannel);
|
||||
HttpHandlingSettings handlingSettings = HttpHandlingSettings.fromSettings(settings);
|
||||
|
||||
|
@ -200,7 +201,7 @@ public class DefaultRestChannelTests extends ESTestCase {
|
|||
Map<String, List<String>> headers = httpResponse.headers;
|
||||
assertNull(headers.get("non-existent-header"));
|
||||
assertEquals(customHeaderValue, headers.get(customHeader).get(0));
|
||||
assertEquals("abc", headers.get(DefaultRestChannel.X_OPAQUE_ID).get(0));
|
||||
assertEquals("abc", headers.get(Task.X_OPAQUE_ID).get(0));
|
||||
assertEquals(Integer.toString(resp.content().length()), headers.get(DefaultRestChannel.CONTENT_LENGTH).get(0));
|
||||
assertEquals(resp.contentType(), headers.get(DefaultRestChannel.CONTENT_TYPE).get(0));
|
||||
}
|
||||
|
@ -208,7 +209,7 @@ public class DefaultRestChannelTests extends ESTestCase {
|
|||
public void testCookiesSet() {
|
||||
Settings settings = Settings.builder().put(HttpTransportSettings.SETTING_HTTP_RESET_COOKIES.getKey(), true).build();
|
||||
final TestRequest httpRequest = new TestRequest(HttpRequest.HttpVersion.HTTP_1_1, RestRequest.Method.GET, "/");
|
||||
httpRequest.getHeaders().put(DefaultRestChannel.X_OPAQUE_ID, Collections.singletonList("abc"));
|
||||
httpRequest.getHeaders().put(Task.X_OPAQUE_ID, Collections.singletonList("abc"));
|
||||
final RestRequest request = RestRequest.request(xContentRegistry(), httpRequest, httpChannel);
|
||||
HttpHandlingSettings handlingSettings = HttpHandlingSettings.fromSettings(settings);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.index;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.search.SearchTask;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -34,12 +35,15 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.internal.ShardSearchRequest;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.test.TestSearchContext;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
@ -166,10 +170,12 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
|
|||
SearchContext searchContext = createSearchContext(index);
|
||||
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
|
||||
searchContext.request().source(source);
|
||||
searchContext.setTask(new SearchTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id")));
|
||||
SearchSlowLog.SlowLogSearchContextPrinter p = new SearchSlowLog.SlowLogSearchContextPrinter(searchContext, 10);
|
||||
assertThat(p.toString(), startsWith("[foo][0]"));
|
||||
// Makes sure that output doesn't contain any new lines
|
||||
assertThat(p.toString(), not(containsString("\n")));
|
||||
assertThat(p.toString(), endsWith("id[my_id], "));
|
||||
}
|
||||
|
||||
public void testLevelSetting() {
|
||||
|
|
Loading…
Reference in New Issue