From c610e0893db3e713bb9eb7d5d1335b9053681638 Mon Sep 17 00:00:00 2001
From: Jay Modi
Date: Tue, 3 Mar 2020 14:11:37 -0700
Subject: [PATCH] Introduce system index APIs for Kibana (#53035)
This commit introduces a module for Kibana that exposes REST APIs that
will be used by Kibana for access to its system indices. These APIs are wrapped
versions of the existing REST endpoints. A new setting is also introduced since
the Kibana system indices' names are allowed to be changed by a user in case
multiple instances of Kibana use the same instance of Elasticsearch.
Additionally, the ThreadContext has been extended to indicate that the use of
system indices may be allowed in a request. This will be built upon in the future
for the protection of system indices.
Backport of #52385
---
modules/kibana/build.gradle | 31 +++
.../elasticsearch/kibana/KibanaPlugin.java | 148 +++++++++++
.../kibana/KibanaPluginTests.java | 47 ++++
.../kibana/KibanaSystemIndexIT.java | 249 ++++++++++++++++++
.../tasksplugin/TasksPlugin.java | 3 +-
.../tasksplugin/TasksPluginTests.java | 3 +-
.../elasticsearch/action/ActionModule.java | 12 +-
.../client/transport/TransportClient.java | 2 +-
.../PublicationTransportHandler.java | 6 +-
.../common/compress/DeflateCompressor.java | 9 +-
.../org/elasticsearch/common/io/Streams.java | 11 +
.../common/io/stream/StreamInput.java | 17 ++
.../common/io/stream/StreamOutput.java | 16 ++
.../common/settings/ClusterSettings.java | 22 +-
.../common/util/concurrent/ThreadContext.java | 88 ++++++-
.../java/org/elasticsearch/node/Node.java | 4 +-
.../plugins/SystemIndexPlugin.java | 4 +-
.../elasticsearch/rest/BaseRestHandler.java | 53 ++++
.../elasticsearch/rest/RestController.java | 17 +-
.../action/RestCancellableNodeClient.java | 50 ++--
.../rest/action/document/RestIndexAction.java | 11 +-
.../CompressibleBytesOutputStream.java | 11 +
.../transport/InboundMessage.java | 36 +--
.../transport/OutboundMessage.java | 4 +-
.../transport/TransportLogger.java | 16 +-
.../action/ActionModuleTests.java | 6 +-
.../common/compress/DeflateCompressTests.java | 7 +
.../util/concurrent/ThreadContextTests.java | 133 +++++++++-
.../rest/RestControllerTests.java | 33 ++-
.../rest/RestHttpResponseHeadersTests.java | 2 +-
.../indices/RestValidateQueryActionTests.java | 2 +-
.../action/cat/RestIndicesActionTests.java | 3 +-
.../action/cat/RestRecoveryActionTests.java | 2 +-
.../action/document/RestIndexActionTests.java | 7 +-
.../CompressibleBytesOutputStreamTests.java | 13 +
.../test/rest/RestActionTestCase.java | 2 +-
.../xpack/enrich/EnrichPlugin.java | 2 +-
.../xpack/logstash/Logstash.java | 2 +-
.../xpack/ml/MachineLearning.java | 2 +-
.../xpack/security/Security.java | 2 +-
.../authc/AuthenticationServiceTests.java | 2 +-
.../xpack/transform/Transform.java | 2 +-
.../elasticsearch/xpack/watcher/Watcher.java | 2 +-
43 files changed, 981 insertions(+), 113 deletions(-)
create mode 100644 modules/kibana/build.gradle
create mode 100644 modules/kibana/src/main/java/org/elasticsearch/kibana/KibanaPlugin.java
create mode 100644 modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaPluginTests.java
create mode 100644 modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaSystemIndexIT.java
diff --git a/modules/kibana/build.gradle b/modules/kibana/build.gradle
new file mode 100644
index 00000000000..f9d11e5a6c5
--- /dev/null
+++ b/modules/kibana/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+esplugin {
+ description 'Plugin exposing APIs for Kibana system indices'
+ classname 'org.elasticsearch.kibana.KibanaPlugin'
+}
+
+dependencies {
+ compile project(path: ':modules:reindex', configuration: 'runtime')
+}
+
+testClusters.integTest {
+ module file(project(':modules:reindex').tasks.bundlePlugin.archiveFile)
+}
diff --git a/modules/kibana/src/main/java/org/elasticsearch/kibana/KibanaPlugin.java b/modules/kibana/src/main/java/org/elasticsearch/kibana/KibanaPlugin.java
new file mode 100644
index 00000000000..c8760f095fc
--- /dev/null
+++ b/modules/kibana/src/main/java/org/elasticsearch/kibana/KibanaPlugin.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.kibana;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.IndexScopedSettings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Setting.Property;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.index.reindex.RestDeleteByQueryAction;
+import org.elasticsearch.indices.SystemIndexDescriptor;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.SystemIndexPlugin;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.admin.indices.RestCreateIndexAction;
+import org.elasticsearch.rest.action.admin.indices.RestGetAliasesAction;
+import org.elasticsearch.rest.action.admin.indices.RestGetIndicesAction;
+import org.elasticsearch.rest.action.admin.indices.RestIndexPutAliasAction;
+import org.elasticsearch.rest.action.admin.indices.RestRefreshAction;
+import org.elasticsearch.rest.action.admin.indices.RestUpdateSettingsAction;
+import org.elasticsearch.rest.action.document.RestBulkAction;
+import org.elasticsearch.rest.action.document.RestDeleteAction;
+import org.elasticsearch.rest.action.document.RestGetAction;
+import org.elasticsearch.rest.action.document.RestIndexAction;
+import org.elasticsearch.rest.action.document.RestIndexAction.AutoIdHandler;
+import org.elasticsearch.rest.action.document.RestIndexAction.CreateHandler;
+import org.elasticsearch.rest.action.document.RestMultiGetAction;
+import org.elasticsearch.rest.action.document.RestUpdateAction;
+import org.elasticsearch.rest.action.search.RestClearScrollAction;
+import org.elasticsearch.rest.action.search.RestSearchAction;
+import org.elasticsearch.rest.action.search.RestSearchScrollAction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class KibanaPlugin extends Plugin implements SystemIndexPlugin {
+
+ public static final Setting> KIBANA_INDEX_NAMES_SETTING = Setting.listSetting("kibana.system_indices",
+ Collections.unmodifiableList(Arrays.asList(".kibana*", ".reporting")), Function.identity(), Property.NodeScope);
+
+ @Override
+ public Collection getSystemIndexDescriptors(Settings settings) {
+ return Collections.unmodifiableList(KIBANA_INDEX_NAMES_SETTING.get(settings).stream()
+ .map(pattern -> new SystemIndexDescriptor(pattern, "System index used by kibana"))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
+ IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
+ IndexNameExpressionResolver indexNameExpressionResolver,
+ Supplier nodesInCluster) {
+ // TODO need to figure out what subset of system indices Kibana should have access to via these APIs
+ final List allowedIndexPatterns = Collections.emptyList();
+ return Collections.unmodifiableList(Arrays.asList(
+ // Based on https://github.com/elastic/kibana/issues/49764
+ // apis needed to perform migrations... ideally these will go away
+ new KibanaWrappedRestHandler(new RestCreateIndexAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestGetAliasesAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestIndexPutAliasAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestRefreshAction(), allowedIndexPatterns),
+
+ // apis needed to access saved objects
+ new KibanaWrappedRestHandler(new RestGetAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestMultiGetAction(settings), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestSearchAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestBulkAction(settings), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestDeleteAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestDeleteByQueryAction(), allowedIndexPatterns),
+
+ // api used for testing
+ new KibanaWrappedRestHandler(new RestUpdateSettingsAction(), allowedIndexPatterns),
+
+ // apis used specifically by reporting
+ new KibanaWrappedRestHandler(new RestGetIndicesAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestIndexAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new CreateHandler(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new AutoIdHandler(nodesInCluster), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestUpdateAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestSearchScrollAction(), allowedIndexPatterns),
+ new KibanaWrappedRestHandler(new RestClearScrollAction(), allowedIndexPatterns)
+ ));
+
+ }
+
+ @Override
+ public List> getSettings() {
+ return Collections.singletonList(KIBANA_INDEX_NAMES_SETTING);
+ }
+
+ static class KibanaWrappedRestHandler extends BaseRestHandler.Wrapper {
+
+ private final List allowedIndexPatterns;
+
+ KibanaWrappedRestHandler(BaseRestHandler delegate, List allowedIndexPatterns) {
+ super(delegate);
+ this.allowedIndexPatterns = allowedIndexPatterns;
+ }
+
+ @Override
+ public String getName() {
+ return "kibana_" + super.getName();
+ }
+
+ @Override
+ public List routes() {
+ return Collections.unmodifiableList(super.routes().stream()
+ .map(route -> new Route(route.getMethod(), "/_kibana" + route.getPath()))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+ client.threadPool().getThreadContext().allowSystemIndexAccess(allowedIndexPatterns);
+ return super.prepareRequest(request, client);
+ }
+ }
+}
diff --git a/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaPluginTests.java b/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaPluginTests.java
new file mode 100644
index 00000000000..1ea24d2ff16
--- /dev/null
+++ b/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaPluginTests.java
@@ -0,0 +1,47 @@
+
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.kibana;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.SystemIndexDescriptor;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+
+public class KibanaPluginTests extends ESTestCase {
+
+ public void testKibanaIndexNames() {
+ assertThat(new KibanaPlugin().getSettings(), contains(KibanaPlugin.KIBANA_INDEX_NAMES_SETTING));
+ assertThat(new KibanaPlugin().getSystemIndexDescriptors(Settings.EMPTY).stream()
+ .map(SystemIndexDescriptor::getIndexPattern).collect(Collectors.toList()),
+ contains(".kibana*", ".reporting"));
+ final List names = Arrays.asList("." + randomAlphaOfLength(4), "." + randomAlphaOfLength(6));
+ final List namesFromDescriptors = new KibanaPlugin().getSystemIndexDescriptors(
+ Settings.builder().putList(KibanaPlugin.KIBANA_INDEX_NAMES_SETTING.getKey(), names).build()
+ ).stream().map(SystemIndexDescriptor::getIndexPattern).collect(Collectors.toList());
+ assertThat(namesFromDescriptors, is(names));
+ }
+}
diff --git a/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaSystemIndexIT.java b/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaSystemIndexIT.java
new file mode 100644
index 00000000000..f3901112e83
--- /dev/null
+++ b/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaSystemIndexIT.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.kibana;
+
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.test.rest.ESRestTestCase;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+
+public class KibanaSystemIndexIT extends ESRestTestCase {
+
+ public void testCreateIndex() throws IOException {
+ Request request = new Request("PUT", "/_kibana/.kibana-1");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ }
+
+ public void testAliases() throws IOException {
+ Request request = new Request("PUT", "/_kibana/.kibana-1");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ request = new Request("PUT", "/_kibana/.kibana-1/_alias/.kibana");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ request = new Request("GET", "/_kibana/_aliases");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ assertThat(EntityUtils.toString(response.getEntity()), containsString(".kibana"));
+ }
+
+ public void testBulkToKibanaIndex() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ }
+
+ public void testRefresh() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ request = new Request("GET", "/_kibana/.kibana/_refresh");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ Request getRequest = new Request("GET", "/_kibana/.kibana/_doc/1");
+ Response getResponse = client().performRequest(getRequest);
+ assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
+ String responseBody = EntityUtils.toString(getResponse.getEntity());
+ assertThat(responseBody, containsString("foo"));
+ assertThat(responseBody, containsString("bar"));
+ }
+
+ public void testGetFromKibanaIndex() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
+ request.addParameter("refresh", "true");
+
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ Request getRequest = new Request("GET", "/_kibana/.kibana/_doc/1");
+ Response getResponse = client().performRequest(getRequest);
+ assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
+ String responseBody = EntityUtils.toString(getResponse.getEntity());
+ assertThat(responseBody, containsString("foo"));
+ assertThat(responseBody, containsString("bar"));
+ }
+
+ public void testMultiGetFromKibanaIndex() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
+ "{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
+ request.addParameter("refresh", "true");
+
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ Request getRequest = new Request("GET", "/_kibana/_mget");
+ getRequest.setJsonEntity("{ \"docs\" : [ { \"_index\" : \".kibana\", \"_id\" : \"1\" }, " +
+ "{ \"_index\" : \".kibana\", \"_id\" : \"2\" } ] }\n");
+ Response getResponse = client().performRequest(getRequest);
+ assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
+ String responseBody = EntityUtils.toString(getResponse.getEntity());
+ assertThat(responseBody, containsString("foo"));
+ assertThat(responseBody, containsString("bar"));
+ assertThat(responseBody, containsString("baz"));
+ assertThat(responseBody, containsString("tag"));
+ }
+
+ public void testSearchFromKibanaIndex() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
+ "{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
+ request.addParameter("refresh", "true");
+
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ Request searchRequest = new Request("GET", "/_kibana/.kibana/_search");
+ searchRequest.setJsonEntity("{ \"query\" : { \"match_all\" : {} } }\n");
+ Response getResponse = client().performRequest(searchRequest);
+ assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
+ String responseBody = EntityUtils.toString(getResponse.getEntity());
+ assertThat(responseBody, containsString("foo"));
+ assertThat(responseBody, containsString("bar"));
+ assertThat(responseBody, containsString("baz"));
+ assertThat(responseBody, containsString("tag"));
+ }
+
+ public void testDeleteFromKibanaIndex() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
+ "{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
+ request.addParameter("refresh", "true");
+
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ Request deleteRequest = new Request("DELETE", "/_kibana/.kibana/_doc/1");
+ Response deleteResponse = client().performRequest(deleteRequest);
+ assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
+ }
+
+ public void testDeleteByQueryFromKibanaIndex() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
+ "{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
+ request.addParameter("refresh", "true");
+
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ Request dbqRequest = new Request("POST", "/_kibana/.kibana/_delete_by_query");
+ dbqRequest.setJsonEntity("{ \"query\" : { \"match_all\" : {} } }\n");
+ Response dbqResponse = client().performRequest(dbqRequest);
+ assertThat(dbqResponse.getStatusLine().getStatusCode(), is(200));
+ }
+
+ public void testUpdateIndexSettings() throws IOException {
+ Request request = new Request("PUT", "/_kibana/.kibana-1");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ request = new Request("PUT", "/_kibana/.kibana-1/_settings");
+ request.setJsonEntity("{ \"index.blocks.read_only\" : false }");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ }
+
+ public void testGetIndex() throws IOException {
+ Request request = new Request("PUT", "/_kibana/.kibana-1");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ request = new Request("GET", "/_kibana/.kibana-1");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ assertThat(EntityUtils.toString(response.getEntity()), containsString(".kibana-1"));
+ }
+
+ public void testIndexingAndUpdatingDocs() throws IOException {
+ Request request = new Request("PUT", "/_kibana/.kibana-1/_doc/1");
+ request.setJsonEntity("{ \"foo\" : \"bar\" }");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(201));
+
+ request = new Request("PUT", "/_kibana/.kibana-1/_create/2");
+ request.setJsonEntity("{ \"foo\" : \"bar\" }");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(201));
+
+ request = new Request("POST", "/_kibana/.kibana-1/_doc");
+ request.setJsonEntity("{ \"foo\" : \"bar\" }");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(201));
+
+ request = new Request("GET", "/_kibana/.kibana-1/_refresh");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ request = new Request("POST", "/_kibana/.kibana-1/_update/1");
+ request.setJsonEntity("{ \"doc\" : { \"foo\" : \"baz\" } }");
+ response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ }
+
+ public void testScrollingDocs() throws IOException {
+ Request request = new Request("POST", "/_kibana/_bulk");
+ request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
+ "{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n" +
+ "{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"3\" } }\n{ \"baz\" : \"tag\" }\n");
+ request.addParameter("refresh", "true");
+ Response response = client().performRequest(request);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+
+ Request searchRequest = new Request("GET", "/_kibana/.kibana/_search");
+ searchRequest.setJsonEntity("{ \"size\" : 1,\n\"query\" : { \"match_all\" : {} } }\n");
+ searchRequest.addParameter("scroll", "1m");
+ response = client().performRequest(searchRequest);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
+ assertNotNull(map.get("_scroll_id"));
+ String scrollId = (String) map.get("_scroll_id");
+
+ Request scrollRequest = new Request("POST", "/_kibana/_search/scroll");
+ scrollRequest.addParameter("scroll_id", scrollId);
+ scrollRequest.addParameter("scroll", "1m");
+ response = client().performRequest(scrollRequest);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
+ assertNotNull(map.get("_scroll_id"));
+ scrollId = (String) map.get("_scroll_id");
+
+ Request clearScrollRequest = new Request("DELETE", "/_kibana/_search/scroll");
+ clearScrollRequest.addParameter("scroll_id", scrollId);
+ response = client().performRequest(clearScrollRequest);
+ assertThat(response.getStatusLine().getStatusCode(), is(200));
+ }
+}
diff --git a/modules/tasks/src/main/java/org/elasticsearch/tasksplugin/TasksPlugin.java b/modules/tasks/src/main/java/org/elasticsearch/tasksplugin/TasksPlugin.java
index b7d63991877..0467b9419c7 100644
--- a/modules/tasks/src/main/java/org/elasticsearch/tasksplugin/TasksPlugin.java
+++ b/modules/tasks/src/main/java/org/elasticsearch/tasksplugin/TasksPlugin.java
@@ -19,6 +19,7 @@
package org.elasticsearch.tasksplugin;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
@@ -34,7 +35,7 @@ import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX;
public class TasksPlugin extends Plugin implements SystemIndexPlugin {
@Override
- public Collection getSystemIndexDescriptors() {
+ public Collection getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(new SystemIndexDescriptor(TASK_INDEX, this.getClass().getSimpleName()));
}
}
diff --git a/modules/tasks/src/test/java/org/elasticsearch/tasksplugin/TasksPluginTests.java b/modules/tasks/src/test/java/org/elasticsearch/tasksplugin/TasksPluginTests.java
index 48ec1e06098..23b873e377e 100644
--- a/modules/tasks/src/test/java/org/elasticsearch/tasksplugin/TasksPluginTests.java
+++ b/modules/tasks/src/test/java/org/elasticsearch/tasksplugin/TasksPluginTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.tasksplugin;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
@@ -27,6 +28,6 @@ public class TasksPluginTests extends ESTestCase {
public void testDummy() {
// This is a dummy test case to satisfy the conventions
TasksPlugin plugin = new TasksPlugin();
- assertThat(plugin.getSystemIndexDescriptors(), Matchers.hasSize(1));
+ assertThat(plugin.getSystemIndexDescriptors(Settings.EMPTY), Matchers.hasSize(1));
}
}
diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java
index 4ec1481811f..f0c223f75bd 100644
--- a/server/src/main/java/org/elasticsearch/action/ActionModule.java
+++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java
@@ -210,7 +210,6 @@ import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.TypeLiteral;
@@ -377,12 +376,11 @@ public class ActionModule extends AbstractModule {
private final RestController restController;
private final RequestValidators mappingRequestValidators;
private final RequestValidators indicesAliasesRequestRequestValidators;
- private final ClusterService clusterService;
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List actionPlugins, NodeClient nodeClient,
- CircuitBreakerService circuitBreakerService, UsageService usageService, ClusterService clusterService) {
+ CircuitBreakerService circuitBreakerService, UsageService usageService) {
this.transportClient = transportClient;
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
@@ -390,7 +388,6 @@ public class ActionModule extends AbstractModule {
this.clusterSettings = clusterSettings;
this.settingsFilter = settingsFilter;
this.actionPlugins = actionPlugins;
- this.clusterService = clusterService;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
@@ -418,11 +415,12 @@ public class ActionModule extends AbstractModule {
if (transportClient) {
restController = null;
} else {
- restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
+ final boolean restrictSystemIndices = RestController.RESTRICT_SYSTEM_INDICES.get(settings);
+ restController =
+ new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, restrictSystemIndices);
}
}
-
public Map> getActions() {
return actions;
}
@@ -641,7 +639,7 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
- registerHandler.accept(new AutoIdHandler(clusterService));
+ registerHandler.accept(new AutoIdHandler(nodesInCluster));
registerHandler.accept(new RestGetAction());
registerHandler.accept(new RestGetSourceAction());
registerHandler.accept(new RestMultiGetAction(settings));
diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
index 8bb936aacec..8e92754b392 100644
--- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
+++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
@@ -187,7 +187,7 @@ public abstract class TransportClient extends AbstractClient {
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
- pluginsService.filterPlugins(ActionPlugin.class), null, null, null, null);
+ pluginsService.filterPlugins(ActionPlugin.class), null, null, null);
modules.add(actionModule);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
index 504d6be126d..82bdb495722 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
@@ -364,8 +364,8 @@ public class PublicationTransportHandler {
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
+ bStream.setVersion(nodeVersion);
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
- stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
}
@@ -374,8 +374,8 @@ public class PublicationTransportHandler {
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
+ bStream.setVersion(nodeVersion);
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
- stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
}
@@ -385,12 +385,12 @@ public class PublicationTransportHandler {
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
+ in.setVersion(request.version());
try {
if (compressor != null) {
in = compressor.streamInput(in);
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
- in.setVersion(request.version());
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
final ClusterState incomingState;
diff --git a/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java b/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java
index 794a8db4960..646e6c61382 100644
--- a/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java
+++ b/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java
@@ -85,7 +85,7 @@ public class DeflateCompressor implements Compressor {
final Inflater inflater = new Inflater(nowrap);
InputStream decompressedIn = new InflaterInputStream(in, inflater, BUFFER_SIZE);
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
- return new InputStreamStreamInput(decompressedIn) {
+ final InputStreamStreamInput inputStreamStreamInput = new InputStreamStreamInput(decompressedIn) {
final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException {
@@ -99,6 +99,9 @@ public class DeflateCompressor implements Compressor {
}
}
};
+
+ inputStreamStreamInput.setVersion(in.getVersion());
+ return inputStreamStreamInput;
}
@Override
@@ -109,7 +112,7 @@ public class DeflateCompressor implements Compressor {
final boolean syncFlush = true;
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
- return new OutputStreamStreamOutput(compressedOut) {
+ final OutputStreamStreamOutput outputStreamStreamOutput = new OutputStreamStreamOutput(compressedOut) {
final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException {
@@ -123,5 +126,7 @@ public class DeflateCompressor implements Compressor {
}
}
};
+ outputStreamStreamOutput.setVersion(out.getVersion());
+ return outputStreamStreamOutput;
}
}
diff --git a/server/src/main/java/org/elasticsearch/common/io/Streams.java b/server/src/main/java/org/elasticsearch/common/io/Streams.java
index 222f94e65ef..3747c4d895a 100644
--- a/server/src/main/java/org/elasticsearch/common/io/Streams.java
+++ b/server/src/main/java/org/elasticsearch/common/io/Streams.java
@@ -19,6 +19,7 @@
package org.elasticsearch.common.io;
+import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -296,5 +297,15 @@ public abstract class Streams {
public BytesReference bytes() {
return delegate.bytes();
}
+
+ @Override
+ public Version getVersion() {
+ return delegate.getVersion();
+ }
+
+ @Override
+ public void setVersion(Version version) {
+ delegate.setVersion(version);
+ }
}
}
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
index a8b3f6df35d..99c3d556805 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
@@ -1145,6 +1145,23 @@ public abstract class StreamInput extends InputStream {
return readList(StreamInput::readString);
}
+ /**
+ * Reads an optional list of strings. The list is expected to have been written using
+ * {@link StreamOutput#writeOptionalStringCollection(Collection)}. If the returned list contains any entries it will be mutable.
+ * If it is empty it might be immutable.
+ *
+ * @return the list of strings
+ * @throws IOException if an I/O exception occurs reading the list
+ */
+ public List readOptionalStringList() throws IOException {
+ final boolean isPresent = readBoolean();
+ if (isPresent) {
+ return readList(StreamInput::readString);
+ } else {
+ return null;
+ }
+ }
+
/**
* Reads a set of objects. If the returned set contains any entries it will be mutable. If it is empty it might be immutable.
*/
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index 02cdaecb51a..dbdfe43877b 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -1125,6 +1125,22 @@ public abstract class StreamOutput extends OutputStream {
writeCollection(collection, StreamOutput::writeString);
}
+ /**
+ * Writes an optional collection of a strings. The corresponding collection can be read from a stream input using
+ * {@link StreamInput#readList(Writeable.Reader)}.
+ *
+ * @param collection the collection of strings
+ * @throws IOException if an I/O exception occurs writing the collection
+ */
+ public void writeOptionalStringCollection(final Collection collection) throws IOException {
+ if (collection != null) {
+ writeBoolean(true);
+ writeCollection(collection, StreamOutput::writeString);
+ } else {
+ writeBoolean(false);
+ }
+ }
+
/**
* Writes a list of {@link NamedWriteable} objects.
*/
diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
index 585d009e246..2d91e5d5546 100644
--- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
@@ -19,6 +19,7 @@
package org.elasticsearch.common.settings;
import org.apache.logging.log4j.LogManager;
+import org.elasticsearch.Build;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.TransportSearchAction;
@@ -104,6 +105,7 @@ import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
@@ -189,7 +191,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
}
}
- public static Set> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ public static final Set> BUILT_IN_CLUSTER_SETTINGS;
+
+ static {
+ final Set> settings = new HashSet<>(Arrays.asList(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL,
TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT,
@@ -536,11 +541,16 @@ public final class ClusterSettings extends AbstractScopedSettings {
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
- DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING)));
+ DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING));
+
+ if (Build.CURRENT.isSnapshot()) {
+ settings.add(RestController.RESTRICT_SYSTEM_INDICES);
+ }
+ BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(settings);
+ }
public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
- SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
- SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
- RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
-
+ SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
+ SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
+ RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
}
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
index 5912cf792a9..1246d4ed2c0 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
@@ -20,9 +20,11 @@ package org.elasticsearch.common.util.concurrent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.elasticsearch.Version;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -50,6 +52,7 @@ import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
+import static java.util.Collections.emptyList;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
@@ -64,7 +67,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARN
* Consumers of ThreadContext usually don't need to interact with adding or stashing contexts. Every elasticsearch thread is managed by
* a thread pool or executor being responsible for stashing and restoring the threads context. For instance if a network request is
* received, all headers are deserialized from the network and directly added as the headers of the threads {@link ThreadContext}
- * (see {@link #readHeaders(StreamInput)}. In order to not modify the context that is currently active on this thread the network code
+ * (see {@link #readFrom(StreamInput)}. In order to not modify the context that is currently active on this thread the network code
* uses a try/with pattern to stash it's current context, read headers into a fresh one and once the request is handled or a handler thread
* is forked (which in turn inherits the context) it restores the previous context. For instance:
*
@@ -234,17 +237,18 @@ public final class ThreadContext implements Writeable {
}
/**
- * Reads the headers from the stream into the current context
+ * Reads the values from the stream into the current context
*/
- public void readHeaders(StreamInput in) throws IOException {
+ public void readFrom(StreamInput in) throws IOException {
final Tuple