Introduce system index APIs for Kibana (#53035)
This commit introduces a module for Kibana that exposes REST APIs that will be used by Kibana for access to its system indices. These APIs are wrapped versions of the existing REST endpoints. A new setting is also introduced since the Kibana system indices' names are allowed to be changed by a user in case multiple instances of Kibana use the same instance of Elasticsearch. Additionally, the ThreadContext has been extended to indicate that the use of system indices may be allowed in a request. This will be built upon in the future for the protection of system indices. Backport of #52385
This commit is contained in:
parent
7339427af5
commit
c610e0893d
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
esplugin {
|
||||
description 'Plugin exposing APIs for Kibana system indices'
|
||||
classname 'org.elasticsearch.kibana.KibanaPlugin'
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compile project(path: ':modules:reindex', configuration: 'runtime')
|
||||
}
|
||||
|
||||
testClusters.integTest {
|
||||
module file(project(':modules:reindex').tasks.bundlePlugin.archiveFile)
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.kibana;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.index.reindex.RestDeleteByQueryAction;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.plugins.SystemIndexPlugin;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestCreateIndexAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestGetAliasesAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestGetIndicesAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestIndexPutAliasAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestRefreshAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestUpdateSettingsAction;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
import org.elasticsearch.rest.action.document.RestDeleteAction;
|
||||
import org.elasticsearch.rest.action.document.RestGetAction;
|
||||
import org.elasticsearch.rest.action.document.RestIndexAction;
|
||||
import org.elasticsearch.rest.action.document.RestIndexAction.AutoIdHandler;
|
||||
import org.elasticsearch.rest.action.document.RestIndexAction.CreateHandler;
|
||||
import org.elasticsearch.rest.action.document.RestMultiGetAction;
|
||||
import org.elasticsearch.rest.action.document.RestUpdateAction;
|
||||
import org.elasticsearch.rest.action.search.RestClearScrollAction;
|
||||
import org.elasticsearch.rest.action.search.RestSearchAction;
|
||||
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class KibanaPlugin extends Plugin implements SystemIndexPlugin {
|
||||
|
||||
public static final Setting<List<String>> KIBANA_INDEX_NAMES_SETTING = Setting.listSetting("kibana.system_indices",
|
||||
Collections.unmodifiableList(Arrays.asList(".kibana*", ".reporting")), Function.identity(), Property.NodeScope);
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.unmodifiableList(KIBANA_INDEX_NAMES_SETTING.get(settings).stream()
|
||||
.map(pattern -> new SystemIndexDescriptor(pattern, "System index used by kibana"))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
|
||||
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
// TODO need to figure out what subset of system indices Kibana should have access to via these APIs
|
||||
final List<String> allowedIndexPatterns = Collections.emptyList();
|
||||
return Collections.unmodifiableList(Arrays.asList(
|
||||
// Based on https://github.com/elastic/kibana/issues/49764
|
||||
// apis needed to perform migrations... ideally these will go away
|
||||
new KibanaWrappedRestHandler(new RestCreateIndexAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestGetAliasesAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestIndexPutAliasAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestRefreshAction(), allowedIndexPatterns),
|
||||
|
||||
// apis needed to access saved objects
|
||||
new KibanaWrappedRestHandler(new RestGetAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestMultiGetAction(settings), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestSearchAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestBulkAction(settings), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestDeleteAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestDeleteByQueryAction(), allowedIndexPatterns),
|
||||
|
||||
// api used for testing
|
||||
new KibanaWrappedRestHandler(new RestUpdateSettingsAction(), allowedIndexPatterns),
|
||||
|
||||
// apis used specifically by reporting
|
||||
new KibanaWrappedRestHandler(new RestGetIndicesAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestIndexAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new CreateHandler(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new AutoIdHandler(nodesInCluster), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestUpdateAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestSearchScrollAction(), allowedIndexPatterns),
|
||||
new KibanaWrappedRestHandler(new RestClearScrollAction(), allowedIndexPatterns)
|
||||
));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Collections.singletonList(KIBANA_INDEX_NAMES_SETTING);
|
||||
}
|
||||
|
||||
static class KibanaWrappedRestHandler extends BaseRestHandler.Wrapper {
|
||||
|
||||
private final List<String> allowedIndexPatterns;
|
||||
|
||||
KibanaWrappedRestHandler(BaseRestHandler delegate, List<String> allowedIndexPatterns) {
|
||||
super(delegate);
|
||||
this.allowedIndexPatterns = allowedIndexPatterns;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "kibana_" + super.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Route> routes() {
|
||||
return Collections.unmodifiableList(super.routes().stream()
|
||||
.map(route -> new Route(route.getMethod(), "/_kibana" + route.getPath()))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
|
||||
client.threadPool().getThreadContext().allowSystemIndexAccess(allowedIndexPatterns);
|
||||
return super.prepareRequest(request, client);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.kibana;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class KibanaPluginTests extends ESTestCase {
|
||||
|
||||
public void testKibanaIndexNames() {
|
||||
assertThat(new KibanaPlugin().getSettings(), contains(KibanaPlugin.KIBANA_INDEX_NAMES_SETTING));
|
||||
assertThat(new KibanaPlugin().getSystemIndexDescriptors(Settings.EMPTY).stream()
|
||||
.map(SystemIndexDescriptor::getIndexPattern).collect(Collectors.toList()),
|
||||
contains(".kibana*", ".reporting"));
|
||||
final List<String> names = Arrays.asList("." + randomAlphaOfLength(4), "." + randomAlphaOfLength(6));
|
||||
final List<String> namesFromDescriptors = new KibanaPlugin().getSystemIndexDescriptors(
|
||||
Settings.builder().putList(KibanaPlugin.KIBANA_INDEX_NAMES_SETTING.getKey(), names).build()
|
||||
).stream().map(SystemIndexDescriptor::getIndexPattern).collect(Collectors.toList());
|
||||
assertThat(namesFromDescriptors, is(names));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,249 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.kibana;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class KibanaSystemIndexIT extends ESRestTestCase {
|
||||
|
||||
public void testCreateIndex() throws IOException {
|
||||
Request request = new Request("PUT", "/_kibana/.kibana-1");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
}
|
||||
|
||||
public void testAliases() throws IOException {
|
||||
Request request = new Request("PUT", "/_kibana/.kibana-1");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
request = new Request("PUT", "/_kibana/.kibana-1/_alias/.kibana");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
request = new Request("GET", "/_kibana/_aliases");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
assertThat(EntityUtils.toString(response.getEntity()), containsString(".kibana"));
|
||||
}
|
||||
|
||||
public void testBulkToKibanaIndex() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
}
|
||||
|
||||
public void testRefresh() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
request = new Request("GET", "/_kibana/.kibana/_refresh");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
Request getRequest = new Request("GET", "/_kibana/.kibana/_doc/1");
|
||||
Response getResponse = client().performRequest(getRequest);
|
||||
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
|
||||
String responseBody = EntityUtils.toString(getResponse.getEntity());
|
||||
assertThat(responseBody, containsString("foo"));
|
||||
assertThat(responseBody, containsString("bar"));
|
||||
}
|
||||
|
||||
public void testGetFromKibanaIndex() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
|
||||
request.addParameter("refresh", "true");
|
||||
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
Request getRequest = new Request("GET", "/_kibana/.kibana/_doc/1");
|
||||
Response getResponse = client().performRequest(getRequest);
|
||||
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
|
||||
String responseBody = EntityUtils.toString(getResponse.getEntity());
|
||||
assertThat(responseBody, containsString("foo"));
|
||||
assertThat(responseBody, containsString("bar"));
|
||||
}
|
||||
|
||||
public void testMultiGetFromKibanaIndex() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
|
||||
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
|
||||
request.addParameter("refresh", "true");
|
||||
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
Request getRequest = new Request("GET", "/_kibana/_mget");
|
||||
getRequest.setJsonEntity("{ \"docs\" : [ { \"_index\" : \".kibana\", \"_id\" : \"1\" }, " +
|
||||
"{ \"_index\" : \".kibana\", \"_id\" : \"2\" } ] }\n");
|
||||
Response getResponse = client().performRequest(getRequest);
|
||||
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
|
||||
String responseBody = EntityUtils.toString(getResponse.getEntity());
|
||||
assertThat(responseBody, containsString("foo"));
|
||||
assertThat(responseBody, containsString("bar"));
|
||||
assertThat(responseBody, containsString("baz"));
|
||||
assertThat(responseBody, containsString("tag"));
|
||||
}
|
||||
|
||||
public void testSearchFromKibanaIndex() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
|
||||
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
|
||||
request.addParameter("refresh", "true");
|
||||
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
Request searchRequest = new Request("GET", "/_kibana/.kibana/_search");
|
||||
searchRequest.setJsonEntity("{ \"query\" : { \"match_all\" : {} } }\n");
|
||||
Response getResponse = client().performRequest(searchRequest);
|
||||
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
|
||||
String responseBody = EntityUtils.toString(getResponse.getEntity());
|
||||
assertThat(responseBody, containsString("foo"));
|
||||
assertThat(responseBody, containsString("bar"));
|
||||
assertThat(responseBody, containsString("baz"));
|
||||
assertThat(responseBody, containsString("tag"));
|
||||
}
|
||||
|
||||
public void testDeleteFromKibanaIndex() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
|
||||
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
|
||||
request.addParameter("refresh", "true");
|
||||
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
Request deleteRequest = new Request("DELETE", "/_kibana/.kibana/_doc/1");
|
||||
Response deleteResponse = client().performRequest(deleteRequest);
|
||||
assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
|
||||
}
|
||||
|
||||
public void testDeleteByQueryFromKibanaIndex() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
|
||||
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
|
||||
request.addParameter("refresh", "true");
|
||||
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
Request dbqRequest = new Request("POST", "/_kibana/.kibana/_delete_by_query");
|
||||
dbqRequest.setJsonEntity("{ \"query\" : { \"match_all\" : {} } }\n");
|
||||
Response dbqResponse = client().performRequest(dbqRequest);
|
||||
assertThat(dbqResponse.getStatusLine().getStatusCode(), is(200));
|
||||
}
|
||||
|
||||
public void testUpdateIndexSettings() throws IOException {
|
||||
Request request = new Request("PUT", "/_kibana/.kibana-1");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
request = new Request("PUT", "/_kibana/.kibana-1/_settings");
|
||||
request.setJsonEntity("{ \"index.blocks.read_only\" : false }");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
}
|
||||
|
||||
public void testGetIndex() throws IOException {
|
||||
Request request = new Request("PUT", "/_kibana/.kibana-1");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
request = new Request("GET", "/_kibana/.kibana-1");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
assertThat(EntityUtils.toString(response.getEntity()), containsString(".kibana-1"));
|
||||
}
|
||||
|
||||
public void testIndexingAndUpdatingDocs() throws IOException {
|
||||
Request request = new Request("PUT", "/_kibana/.kibana-1/_doc/1");
|
||||
request.setJsonEntity("{ \"foo\" : \"bar\" }");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(201));
|
||||
|
||||
request = new Request("PUT", "/_kibana/.kibana-1/_create/2");
|
||||
request.setJsonEntity("{ \"foo\" : \"bar\" }");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(201));
|
||||
|
||||
request = new Request("POST", "/_kibana/.kibana-1/_doc");
|
||||
request.setJsonEntity("{ \"foo\" : \"bar\" }");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(201));
|
||||
|
||||
request = new Request("GET", "/_kibana/.kibana-1/_refresh");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
request = new Request("POST", "/_kibana/.kibana-1/_update/1");
|
||||
request.setJsonEntity("{ \"doc\" : { \"foo\" : \"baz\" } }");
|
||||
response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
}
|
||||
|
||||
public void testScrollingDocs() throws IOException {
|
||||
Request request = new Request("POST", "/_kibana/_bulk");
|
||||
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
|
||||
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n" +
|
||||
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"3\" } }\n{ \"baz\" : \"tag\" }\n");
|
||||
request.addParameter("refresh", "true");
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
|
||||
Request searchRequest = new Request("GET", "/_kibana/.kibana/_search");
|
||||
searchRequest.setJsonEntity("{ \"size\" : 1,\n\"query\" : { \"match_all\" : {} } }\n");
|
||||
searchRequest.addParameter("scroll", "1m");
|
||||
response = client().performRequest(searchRequest);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
|
||||
assertNotNull(map.get("_scroll_id"));
|
||||
String scrollId = (String) map.get("_scroll_id");
|
||||
|
||||
Request scrollRequest = new Request("POST", "/_kibana/_search/scroll");
|
||||
scrollRequest.addParameter("scroll_id", scrollId);
|
||||
scrollRequest.addParameter("scroll", "1m");
|
||||
response = client().performRequest(scrollRequest);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
|
||||
assertNotNull(map.get("_scroll_id"));
|
||||
scrollId = (String) map.get("_scroll_id");
|
||||
|
||||
Request clearScrollRequest = new Request("DELETE", "/_kibana/_search/scroll");
|
||||
clearScrollRequest.addParameter("scroll_id", scrollId);
|
||||
response = client().performRequest(clearScrollRequest);
|
||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.tasksplugin;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.plugins.SystemIndexPlugin;
|
||||
|
@ -34,7 +35,7 @@ import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX;
|
|||
public class TasksPlugin extends Plugin implements SystemIndexPlugin {
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.singletonList(new SystemIndexDescriptor(TASK_INDEX, this.getClass().getSimpleName()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.tasksplugin;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
|
@ -27,6 +28,6 @@ public class TasksPluginTests extends ESTestCase {
|
|||
public void testDummy() {
|
||||
// This is a dummy test case to satisfy the conventions
|
||||
TasksPlugin plugin = new TasksPlugin();
|
||||
assertThat(plugin.getSystemIndexDescriptors(), Matchers.hasSize(1));
|
||||
assertThat(plugin.getSystemIndexDescriptors(Settings.EMPTY), Matchers.hasSize(1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -210,7 +210,6 @@ import org.elasticsearch.action.update.UpdateAction;
|
|||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.NamedRegistry;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.TypeLiteral;
|
||||
|
@ -377,12 +376,11 @@ public class ActionModule extends AbstractModule {
|
|||
private final RestController restController;
|
||||
private final RequestValidators<PutMappingRequest> mappingRequestValidators;
|
||||
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
|
||||
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
|
||||
CircuitBreakerService circuitBreakerService, UsageService usageService, ClusterService clusterService) {
|
||||
CircuitBreakerService circuitBreakerService, UsageService usageService) {
|
||||
this.transportClient = transportClient;
|
||||
this.settings = settings;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
|
@ -390,7 +388,6 @@ public class ActionModule extends AbstractModule {
|
|||
this.clusterSettings = clusterSettings;
|
||||
this.settingsFilter = settingsFilter;
|
||||
this.actionPlugins = actionPlugins;
|
||||
this.clusterService = clusterService;
|
||||
actions = setupActions(actionPlugins);
|
||||
actionFilters = setupActionFilters(actionPlugins);
|
||||
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
|
||||
|
@ -418,11 +415,12 @@ public class ActionModule extends AbstractModule {
|
|||
if (transportClient) {
|
||||
restController = null;
|
||||
} else {
|
||||
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
|
||||
final boolean restrictSystemIndices = RestController.RESTRICT_SYSTEM_INDICES.get(settings);
|
||||
restController =
|
||||
new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, restrictSystemIndices);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Map<String, ActionHandler<?, ?>> getActions() {
|
||||
return actions;
|
||||
}
|
||||
|
@ -641,7 +639,7 @@ public class ActionModule extends AbstractModule {
|
|||
|
||||
registerHandler.accept(new RestIndexAction());
|
||||
registerHandler.accept(new CreateHandler());
|
||||
registerHandler.accept(new AutoIdHandler(clusterService));
|
||||
registerHandler.accept(new AutoIdHandler(nodesInCluster));
|
||||
registerHandler.accept(new RestGetAction());
|
||||
registerHandler.accept(new RestGetSourceAction());
|
||||
registerHandler.accept(new RestMultiGetAction(settings));
|
||||
|
|
|
@ -187,7 +187,7 @@ public abstract class TransportClient extends AbstractClient {
|
|||
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
|
||||
ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
|
||||
settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
|
||||
pluginsService.filterPlugins(ActionPlugin.class), null, null, null, null);
|
||||
pluginsService.filterPlugins(ActionPlugin.class), null, null, null);
|
||||
modules.add(actionModule);
|
||||
|
||||
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
|
||||
|
|
|
@ -364,8 +364,8 @@ public class PublicationTransportHandler {
|
|||
|
||||
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
|
||||
final BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
bStream.setVersion(nodeVersion);
|
||||
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
|
||||
stream.setVersion(nodeVersion);
|
||||
stream.writeBoolean(true);
|
||||
clusterState.writeTo(stream);
|
||||
}
|
||||
|
@ -374,8 +374,8 @@ public class PublicationTransportHandler {
|
|||
|
||||
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
|
||||
final BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
bStream.setVersion(nodeVersion);
|
||||
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
|
||||
stream.setVersion(nodeVersion);
|
||||
stream.writeBoolean(false);
|
||||
diff.writeTo(stream);
|
||||
}
|
||||
|
@ -385,12 +385,12 @@ public class PublicationTransportHandler {
|
|||
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
|
||||
final Compressor compressor = CompressorFactory.compressor(request.bytes());
|
||||
StreamInput in = request.bytes().streamInput();
|
||||
in.setVersion(request.version());
|
||||
try {
|
||||
if (compressor != null) {
|
||||
in = compressor.streamInput(in);
|
||||
}
|
||||
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
|
||||
in.setVersion(request.version());
|
||||
// If true we received full cluster state - otherwise diffs
|
||||
if (in.readBoolean()) {
|
||||
final ClusterState incomingState;
|
||||
|
|
|
@ -85,7 +85,7 @@ public class DeflateCompressor implements Compressor {
|
|||
final Inflater inflater = new Inflater(nowrap);
|
||||
InputStream decompressedIn = new InflaterInputStream(in, inflater, BUFFER_SIZE);
|
||||
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
|
||||
return new InputStreamStreamInput(decompressedIn) {
|
||||
final InputStreamStreamInput inputStreamStreamInput = new InputStreamStreamInput(decompressedIn) {
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
public void close() throws IOException {
|
||||
|
@ -99,6 +99,9 @@ public class DeflateCompressor implements Compressor {
|
|||
}
|
||||
}
|
||||
};
|
||||
|
||||
inputStreamStreamInput.setVersion(in.getVersion());
|
||||
return inputStreamStreamInput;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,7 +112,7 @@ public class DeflateCompressor implements Compressor {
|
|||
final boolean syncFlush = true;
|
||||
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
|
||||
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
|
||||
return new OutputStreamStreamOutput(compressedOut) {
|
||||
final OutputStreamStreamOutput outputStreamStreamOutput = new OutputStreamStreamOutput(compressedOut) {
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
public void close() throws IOException {
|
||||
|
@ -123,5 +126,7 @@ public class DeflateCompressor implements Compressor {
|
|||
}
|
||||
}
|
||||
};
|
||||
outputStreamStreamOutput.setVersion(out.getVersion());
|
||||
return outputStreamStreamOutput;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.common.io;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStream;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -296,5 +297,15 @@ public abstract class Streams {
|
|||
public BytesReference bytes() {
|
||||
return delegate.bytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getVersion() {
|
||||
return delegate.getVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVersion(Version version) {
|
||||
delegate.setVersion(version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1145,6 +1145,23 @@ public abstract class StreamInput extends InputStream {
|
|||
return readList(StreamInput::readString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads an optional list of strings. The list is expected to have been written using
|
||||
* {@link StreamOutput#writeOptionalStringCollection(Collection)}. If the returned list contains any entries it will be mutable.
|
||||
* If it is empty it might be immutable.
|
||||
*
|
||||
* @return the list of strings
|
||||
* @throws IOException if an I/O exception occurs reading the list
|
||||
*/
|
||||
public List<String> readOptionalStringList() throws IOException {
|
||||
final boolean isPresent = readBoolean();
|
||||
if (isPresent) {
|
||||
return readList(StreamInput::readString);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a set of objects. If the returned set contains any entries it will be mutable. If it is empty it might be immutable.
|
||||
*/
|
||||
|
|
|
@ -1125,6 +1125,22 @@ public abstract class StreamOutput extends OutputStream {
|
|||
writeCollection(collection, StreamOutput::writeString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes an optional collection of a strings. The corresponding collection can be read from a stream input using
|
||||
* {@link StreamInput#readList(Writeable.Reader)}.
|
||||
*
|
||||
* @param collection the collection of strings
|
||||
* @throws IOException if an I/O exception occurs writing the collection
|
||||
*/
|
||||
public void writeOptionalStringCollection(final Collection<String> collection) throws IOException {
|
||||
if (collection != null) {
|
||||
writeBoolean(true);
|
||||
writeCollection(collection, StreamOutput::writeString);
|
||||
} else {
|
||||
writeBoolean(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a list of {@link NamedWriteable} objects.
|
||||
*/
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.common.settings;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.Build;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
|
||||
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
|
||||
import org.elasticsearch.action.search.TransportSearchAction;
|
||||
|
@ -104,6 +105,7 @@ import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
|
|||
import org.elasticsearch.plugins.PluginsService;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
|
@ -189,7 +191,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
}
|
||||
}
|
||||
|
||||
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
public static final Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS;
|
||||
|
||||
static {
|
||||
final Set<Setting<?>> settings = new HashSet<>(Arrays.asList(
|
||||
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
|
||||
TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL,
|
||||
TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT,
|
||||
|
@ -536,11 +541,16 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
|
||||
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
|
||||
DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
|
||||
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING)));
|
||||
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING));
|
||||
|
||||
if (Build.CURRENT.isSnapshot()) {
|
||||
settings.add(RestController.RESTRICT_SYSTEM_INDICES);
|
||||
}
|
||||
BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(settings);
|
||||
}
|
||||
|
||||
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
|
||||
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
|
||||
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
|
||||
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
|
||||
}
|
||||
|
|
|
@ -20,9 +20,11 @@ package org.elasticsearch.common.util.concurrent;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -50,6 +52,7 @@ import java.util.function.Supplier;
|
|||
import java.util.stream.Collector;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
|
||||
|
||||
|
@ -64,7 +67,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARN
|
|||
* Consumers of ThreadContext usually don't need to interact with adding or stashing contexts. Every elasticsearch thread is managed by
|
||||
* a thread pool or executor being responsible for stashing and restoring the threads context. For instance if a network request is
|
||||
* received, all headers are deserialized from the network and directly added as the headers of the threads {@link ThreadContext}
|
||||
* (see {@link #readHeaders(StreamInput)}. In order to not modify the context that is currently active on this thread the network code
|
||||
* (see {@link #readFrom(StreamInput)}. In order to not modify the context that is currently active on this thread the network code
|
||||
* uses a try/with pattern to stash it's current context, read headers into a fresh one and once the request is handled or a handler thread
|
||||
* is forked (which in turn inherits the context) it restores the previous context. For instance:
|
||||
* </p>
|
||||
|
@ -234,17 +237,18 @@ public final class ThreadContext implements Writeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Reads the headers from the stream into the current context
|
||||
* Reads the values from the stream into the current context
|
||||
*/
|
||||
public void readHeaders(StreamInput in) throws IOException {
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
final Tuple<Map<String, String>, Map<String, Set<String>>> streamTuple = readHeadersFromStream(in);
|
||||
final Map<String, String> requestHeaders = streamTuple.v1();
|
||||
final Map<String, Set<String>> responseHeaders = streamTuple.v2();
|
||||
final List<String> allowedSystemIndices = readAllowedSystemIndices(in);
|
||||
final ThreadContextStruct struct;
|
||||
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
|
||||
if (requestHeaders.isEmpty() && responseHeaders.isEmpty() && allowedSystemIndices.isEmpty()) {
|
||||
struct = ThreadContextStruct.EMPTY;
|
||||
} else {
|
||||
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false);
|
||||
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), allowedSystemIndices, false, 0L);
|
||||
}
|
||||
threadLocal.set(struct);
|
||||
}
|
||||
|
@ -271,6 +275,14 @@ public final class ThreadContext implements Writeable {
|
|||
return new Tuple<>(requestHeaders, responseHeaders);
|
||||
}
|
||||
|
||||
public static List<String> readAllowedSystemIndices(StreamInput in) throws IOException {
|
||||
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
|
||||
return in.readOptionalStringList();
|
||||
} else {
|
||||
return emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the header for the given key or <code>null</code> if not present
|
||||
*/
|
||||
|
@ -414,6 +426,36 @@ public final class ThreadContext implements Writeable {
|
|||
return threadLocal.get().isSystemContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if a request made within this context can access system indices
|
||||
*/
|
||||
public boolean isSystemIndexAccessAllowed() {
|
||||
return threadLocal.get().allowedSystemIndexPatterns != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the context to disallow access to system indices
|
||||
*/
|
||||
public void disallowSystemIndexAccess() {
|
||||
threadLocal.set(threadLocal.get().setAllowSystemIndices(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the context to allow access to system indices
|
||||
*/
|
||||
public void allowSystemIndexAccess(List<String> patterns) {
|
||||
threadLocal.set(threadLocal.get().setAllowSystemIndices(patterns));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the list of allowed system index patterns or {@code null} if none are allowed. An
|
||||
* empty list indicates that all system indices are allowed to be accessed.
|
||||
*/
|
||||
@Nullable
|
||||
public List<String> allowedSystemIndexPatterns() {
|
||||
return threadLocal.get().allowedSystemIndexPatterns;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface StoredContext extends AutoCloseable {
|
||||
@Override
|
||||
|
@ -445,6 +487,7 @@ public final class ThreadContext implements Writeable {
|
|||
private final Map<String, String> requestHeaders;
|
||||
private final Map<String, Object> transientHeaders;
|
||||
private final Map<String, Set<String>> responseHeaders;
|
||||
private final List<String> allowedSystemIndexPatterns;
|
||||
private final boolean isSystemContext;
|
||||
//saving current warning headers' size not to recalculate the size with every new warning header
|
||||
private final long warningHeadersSize;
|
||||
|
@ -459,29 +502,41 @@ public final class ThreadContext implements Writeable {
|
|||
private ThreadContextStruct(Map<String, String> requestHeaders,
|
||||
Map<String, Set<String>> responseHeaders,
|
||||
Map<String, Object> transientHeaders, boolean isSystemContext) {
|
||||
this.requestHeaders = requestHeaders;
|
||||
this.responseHeaders = responseHeaders;
|
||||
this.transientHeaders = transientHeaders;
|
||||
this.isSystemContext = isSystemContext;
|
||||
this.warningHeadersSize = 0L;
|
||||
this(requestHeaders, responseHeaders, transientHeaders, emptyList(), isSystemContext, 0L);
|
||||
}
|
||||
|
||||
private ThreadContextStruct(Map<String, String> requestHeaders,
|
||||
Map<String, Set<String>> responseHeaders,
|
||||
Map<String, Object> transientHeaders, boolean isSystemContext,
|
||||
long warningHeadersSize) {
|
||||
this(requestHeaders, responseHeaders, transientHeaders, emptyList(), isSystemContext, warningHeadersSize);
|
||||
}
|
||||
|
||||
private ThreadContextStruct(Map<String, String> requestHeaders,
|
||||
Map<String, Set<String>> responseHeaders,
|
||||
Map<String, Object> transientHeaders,
|
||||
List<String> allowedSystemIndexPatterns,
|
||||
boolean isSystemContext,
|
||||
long warningHeadersSize) {
|
||||
this.requestHeaders = requestHeaders;
|
||||
this.responseHeaders = responseHeaders;
|
||||
this.transientHeaders = transientHeaders;
|
||||
this.isSystemContext = isSystemContext;
|
||||
this.warningHeadersSize = warningHeadersSize;
|
||||
this.allowedSystemIndexPatterns = allowedSystemIndexPatterns;
|
||||
}
|
||||
|
||||
/**
|
||||
* This represents the default context and it should only ever be called by {@link #DEFAULT_CONTEXT}.
|
||||
*/
|
||||
private ThreadContextStruct() {
|
||||
this(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), false);
|
||||
this(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), emptyList(), false, 0L);
|
||||
}
|
||||
|
||||
private ThreadContextStruct setAllowSystemIndices(List<String> allowedSystemIndexPatterns) {
|
||||
final List<String> copy =
|
||||
allowedSystemIndexPatterns == null ? null : Collections.unmodifiableList(new ArrayList<>(allowedSystemIndexPatterns));
|
||||
return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, copy, isSystemContext, warningHeadersSize);
|
||||
}
|
||||
|
||||
private ThreadContextStruct putRequest(String key, String value) {
|
||||
|
@ -525,7 +580,8 @@ public final class ThreadContext implements Writeable {
|
|||
newResponseHeaders.put(key, entry.getValue());
|
||||
}
|
||||
}
|
||||
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
|
||||
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, allowedSystemIndexPatterns,
|
||||
isSystemContext, 0L);
|
||||
}
|
||||
|
||||
private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue,
|
||||
|
@ -575,7 +631,8 @@ public final class ThreadContext implements Writeable {
|
|||
return this;
|
||||
}
|
||||
}
|
||||
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
|
||||
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, allowedSystemIndexPatterns,
|
||||
isSystemContext, newWarningHeaderSize);
|
||||
}
|
||||
|
||||
|
||||
|
@ -611,6 +668,9 @@ public final class ThreadContext implements Writeable {
|
|||
}
|
||||
|
||||
out.writeMap(responseHeaders, StreamOutput::writeString, StreamOutput::writeStringCollection);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_7_0)) { // TODO update version on backport
|
||||
out.writeOptionalStringCollection(allowedSystemIndexPatterns);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -628,7 +688,7 @@ public final class ThreadContext implements Writeable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
try (ThreadContext.StoredContext ignore = stashContext()){
|
||||
try (ThreadContext.StoredContext ignore = stashContext()) {
|
||||
ctx.restore();
|
||||
in.run();
|
||||
}
|
||||
|
|
|
@ -443,7 +443,7 @@ public class Node implements Closeable {
|
|||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
plugin -> plugin.getClass().getSimpleName(),
|
||||
plugin -> plugin.getSystemIndexDescriptors())));
|
||||
plugin -> plugin.getSystemIndexDescriptors(settings))));
|
||||
SystemIndexDescriptor.checkForOverlappingPatterns(systemIndexDescriptorMap);
|
||||
|
||||
final List<SystemIndexDescriptor> systemIndexDescriptors = systemIndexDescriptorMap.values().stream()
|
||||
|
@ -479,7 +479,7 @@ public class Node implements Closeable {
|
|||
|
||||
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
|
||||
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
|
||||
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, clusterService);
|
||||
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
|
||||
modules.add(actionModule);
|
||||
|
||||
final RestController restController = actionModule.getRestController();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.plugins;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -33,9 +34,10 @@ public interface SystemIndexPlugin extends ActionPlugin {
|
|||
/**
|
||||
* Returns a {@link Collection} of {@link SystemIndexDescriptor}s that describe this plugin's system indices, including
|
||||
* name, mapping, and settings.
|
||||
* @param settings The node's settings
|
||||
* @return Descriptions of the system indices managed by this plugin.
|
||||
*/
|
||||
default Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
default Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -197,4 +197,57 @@ public abstract class BaseRestHandler implements RestHandler {
|
|||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
public static class Wrapper extends BaseRestHandler {
|
||||
|
||||
protected final BaseRestHandler delegate;
|
||||
|
||||
public Wrapper(BaseRestHandler delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return delegate.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Route> routes() {
|
||||
return delegate.routes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DeprecatedRoute> deprecatedRoutes() {
|
||||
return delegate.deprecatedRoutes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ReplacedRoute> replacedRoutes() {
|
||||
return delegate.replacedRoutes();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
|
||||
return delegate.prepareRequest(request, client);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Set<String> responseParams() {
|
||||
return delegate.responseParams();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return delegate.canTripCircuitBreaker();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsContentStream() {
|
||||
return delegate.supportsContentStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allowsUnsafeBuffers() {
|
||||
return delegate.allowsUnsafeBuffers();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.path.PathTrie;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -66,6 +68,10 @@ public class RestController implements HttpServerTransport.Dispatcher {
|
|||
private static final Logger logger = LogManager.getLogger(RestController.class);
|
||||
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||
|
||||
// TODO once we are ready, this should default to true
|
||||
public static final Setting<Boolean> RESTRICT_SYSTEM_INDICES =
|
||||
Setting.boolSetting("rest.restrict_system_indices", false, Property.NodeScope);
|
||||
|
||||
private final PathTrie<MethodHandlers> handlers = new PathTrie<>(RestUtils.REST_DECODER);
|
||||
|
||||
private final UnaryOperator<RestHandler> handlerWrapper;
|
||||
|
@ -77,9 +83,10 @@ public class RestController implements HttpServerTransport.Dispatcher {
|
|||
/** Rest headers that are copied to internal requests made during a rest request. */
|
||||
private final Set<RestHeaderDefinition> headersToCopy;
|
||||
private final UsageService usageService;
|
||||
private final boolean restrictSystemIndices;
|
||||
|
||||
public RestController(Set<RestHeaderDefinition> headersToCopy, UnaryOperator<RestHandler> handlerWrapper,
|
||||
NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService) {
|
||||
NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService, boolean restrictSystemIndices) {
|
||||
this.headersToCopy = headersToCopy;
|
||||
this.usageService = usageService;
|
||||
if (handlerWrapper == null) {
|
||||
|
@ -88,6 +95,7 @@ public class RestController implements HttpServerTransport.Dispatcher {
|
|||
this.handlerWrapper = handlerWrapper;
|
||||
this.client = client;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
this.restrictSystemIndices = restrictSystemIndices;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -172,6 +180,13 @@ public class RestController implements HttpServerTransport.Dispatcher {
|
|||
handleFavicon(request.method(), request.uri(), channel);
|
||||
return;
|
||||
}
|
||||
|
||||
if (restrictSystemIndices) {
|
||||
threadContext.disallowSystemIndexAccess();
|
||||
} else {
|
||||
assert threadContext.isSystemIndexAccessAllowed();
|
||||
}
|
||||
|
||||
try {
|
||||
tryAllHandlers(request, channel, threadContext);
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
|
||||
|
@ -85,28 +86,41 @@ public class RestCancellableNodeClient extends FilterClient {
|
|||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
|
||||
ActionType<Response> action, Request request, ActionListener<Response> listener) {
|
||||
CloseListener closeListener = httpChannels.computeIfAbsent(httpChannel, channel -> new CloseListener());
|
||||
final AtomicBoolean created = new AtomicBoolean(false);
|
||||
CloseListener closeListener = httpChannels.computeIfAbsent(httpChannel, channel -> {
|
||||
created.set(true);
|
||||
return new CloseListener();
|
||||
});
|
||||
TaskHolder taskHolder = new TaskHolder();
|
||||
Task task = client.executeLocally(action, request,
|
||||
new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
try {
|
||||
closeListener.unregisterTask(taskHolder);
|
||||
} finally {
|
||||
listener.onResponse(response);
|
||||
final Task task;
|
||||
boolean success = false;
|
||||
try {
|
||||
task = client.executeLocally(action, request,
|
||||
new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
try {
|
||||
closeListener.unregisterTask(taskHolder);
|
||||
} finally {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
closeListener.unregisterTask(taskHolder);
|
||||
} finally {
|
||||
listener.onFailure(e);
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
closeListener.unregisterTask(taskHolder);
|
||||
} finally {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false && created.get()) {
|
||||
httpChannels.remove(httpChannel);
|
||||
}
|
||||
}
|
||||
final TaskId taskId = new TaskId(client.getLocalNodeId(), task.getId());
|
||||
closeListener.registerTask(taskHolder, taskId);
|
||||
closeListener.maybeRegisterChannel(httpChannel);
|
||||
|
|
|
@ -24,8 +24,8 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
|
@ -36,6 +36,7 @@ import org.elasticsearch.rest.action.RestStatusToXContentListener;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
@ -95,10 +96,10 @@ public class RestIndexAction extends BaseRestHandler {
|
|||
|
||||
public static final class AutoIdHandler extends RestIndexAction {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final Supplier<DiscoveryNodes> nodesInCluster;
|
||||
|
||||
public AutoIdHandler(ClusterService clusterService) {
|
||||
this.clusterService = clusterService;
|
||||
public AutoIdHandler(Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
this.nodesInCluster = nodesInCluster;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,7 +117,7 @@ public class RestIndexAction extends BaseRestHandler {
|
|||
@Override
|
||||
public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient client) throws IOException {
|
||||
assert request.params().get("id") == null : "non-null id: " + request.params().get("id");
|
||||
if (request.params().get("op_type") == null && clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_5_0)) {
|
||||
if (request.params().get("op_type") == null && nodesInCluster.get().getMinNodeVersion().onOrAfter(Version.V_7_5_0)) {
|
||||
// default to op_type create
|
||||
request.params().put("op_type", "create");
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
|
@ -102,4 +103,14 @@ final class CompressibleBytesOutputStream extends StreamOutput {
|
|||
public void reset() throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getVersion() {
|
||||
return stream.getVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVersion(Version version) {
|
||||
stream.setVersion(version);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,8 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
|||
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
|
||||
long requestId = streamInput.readLong();
|
||||
byte status = streamInput.readByte();
|
||||
Version remoteVersion = Version.fromId(streamInput.readInt());
|
||||
final Version remoteVersion = Version.fromId(streamInput.readInt());
|
||||
streamInput.setVersion(remoteVersion);
|
||||
final boolean isHandshake = TransportStatus.isHandshake(status);
|
||||
ensureVersionCompatibility(remoteVersion, version, isHandshake);
|
||||
|
||||
|
@ -73,10 +74,11 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
|||
// Consume the variable header size
|
||||
streamInput.readInt();
|
||||
} else {
|
||||
streamInput = decompressingStream(status, remoteVersion, streamInput);
|
||||
streamInput = decompressingStream(status, streamInput);
|
||||
assertRemoteVersion(streamInput, remoteVersion);
|
||||
}
|
||||
|
||||
threadContext.readHeaders(streamInput);
|
||||
threadContext.readFrom(streamInput);
|
||||
|
||||
InboundMessage message;
|
||||
if (TransportStatus.isRequest(status)) {
|
||||
|
@ -94,15 +96,19 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
|||
final String action = streamInput.readString();
|
||||
|
||||
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
streamInput = decompressingStream(status, remoteVersion, streamInput);
|
||||
streamInput = decompressingStream(status, streamInput);
|
||||
assertRemoteVersion(streamInput, remoteVersion);
|
||||
}
|
||||
streamInput = namedWriteableStream(streamInput, remoteVersion);
|
||||
streamInput = namedWriteableStream(streamInput);
|
||||
assertRemoteVersion(streamInput, remoteVersion);
|
||||
message = new Request(threadContext, remoteVersion, status, requestId, action, features, streamInput);
|
||||
} else {
|
||||
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
streamInput = decompressingStream(status, remoteVersion, streamInput);
|
||||
streamInput = decompressingStream(status, streamInput);
|
||||
assertRemoteVersion(streamInput, remoteVersion);
|
||||
}
|
||||
streamInput = namedWriteableStream(streamInput, remoteVersion);
|
||||
streamInput = namedWriteableStream(streamInput);
|
||||
assertRemoteVersion(streamInput, remoteVersion);
|
||||
message = new Response(threadContext, remoteVersion, status, requestId, streamInput);
|
||||
}
|
||||
success = true;
|
||||
|
@ -114,12 +120,10 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
|||
}
|
||||
}
|
||||
|
||||
static StreamInput decompressingStream(byte status, Version remoteVersion, StreamInput streamInput) throws IOException {
|
||||
static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
|
||||
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
|
||||
try {
|
||||
StreamInput decompressor = CompressorFactory.COMPRESSOR.streamInput(streamInput);
|
||||
decompressor.setVersion(remoteVersion);
|
||||
return decompressor;
|
||||
return CompressorFactory.COMPRESSOR.streamInput(streamInput);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
|
||||
}
|
||||
|
@ -128,10 +132,12 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
|||
}
|
||||
}
|
||||
|
||||
private StreamInput namedWriteableStream(StreamInput delegate, Version remoteVersion) {
|
||||
NamedWriteableAwareStreamInput streamInput = new NamedWriteableAwareStreamInput(delegate, namedWriteableRegistry);
|
||||
streamInput.setVersion(remoteVersion);
|
||||
return streamInput;
|
||||
private StreamInput namedWriteableStream(StreamInput delegate) {
|
||||
return new NamedWriteableAwareStreamInput(delegate, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
static void assertRemoteVersion(StreamInput in, Version version) {
|
||||
assert version.equals(in.getVersion()) : "Stream version [" + in.getVersion() + "] does not match version [" + version + "]";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -55,9 +55,9 @@ abstract class OutboundMessage extends NetworkMessage {
|
|||
}
|
||||
|
||||
try (CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
|
||||
stream.setVersion(version);
|
||||
assert stream.getVersion().equals(version) :
|
||||
"Stream version [" + stream.getVersion() + "] does not match version [" + version + "]";
|
||||
stream.setFeatures(bytesStream.getFeatures());
|
||||
|
||||
if (variableHeaderLength == -1) {
|
||||
writeVariableHeader(stream);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.transport.InboundMessage.Reader.assertRemoteVersion;
|
||||
|
||||
public final class TransportLogger {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportLogger.class);
|
||||
|
@ -75,7 +77,8 @@ public final class TransportLogger {
|
|||
final byte status = streamInput.readByte();
|
||||
final boolean isRequest = TransportStatus.isRequest(status);
|
||||
final String type = isRequest ? "request" : "response";
|
||||
Version version = Version.fromId(streamInput.readInt());
|
||||
final Version version = Version.fromId(streamInput.readInt());
|
||||
streamInput.setVersion(version);
|
||||
sb.append(" [length: ").append(messageLengthWithHeader);
|
||||
sb.append(", request id: ").append(requestId);
|
||||
sb.append(", type: ").append(type);
|
||||
|
@ -84,11 +87,18 @@ public final class TransportLogger {
|
|||
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
sb.append(", header size: ").append(streamInput.readInt()).append('B');
|
||||
} else {
|
||||
streamInput = InboundMessage.Reader.decompressingStream(status, version, streamInput);
|
||||
streamInput = InboundMessage.Reader.decompressingStream(status, streamInput);
|
||||
assertRemoteVersion(streamInput, version);
|
||||
}
|
||||
|
||||
// read and discard headers
|
||||
// TODO (jaymode) Need a better way to deal with this. In one aspect,
|
||||
// changes were made to ThreadContext to allocate less internally, yet we have this
|
||||
// ugliness needed to move past the threadcontext data in the stream and discard it
|
||||
// Could we have an alternative that essentially just seeks through the stream with
|
||||
// minimal allocation?
|
||||
// read and discard thread context data
|
||||
ThreadContext.readHeadersFromStream(streamInput);
|
||||
ThreadContext.readAllowedSystemIndices(streamInput);
|
||||
|
||||
if (isRequest) {
|
||||
if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
|
|
|
@ -109,7 +109,7 @@ public class ActionModuleTests extends ESTestCase {
|
|||
UsageService usageService = new UsageService();
|
||||
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(),
|
||||
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), null, emptyList(), null,
|
||||
null, usageService, null);
|
||||
null, usageService);
|
||||
actionModule.initRestHandlers(null);
|
||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () ->
|
||||
|
@ -141,7 +141,7 @@ public class ActionModuleTests extends ESTestCase {
|
|||
UsageService usageService = new UsageService();
|
||||
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(),
|
||||
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
|
||||
singletonList(dupsMainAction), null, null, usageService, null);
|
||||
singletonList(dupsMainAction), null, null, usageService);
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null));
|
||||
assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/] for method: GET"));
|
||||
} finally {
|
||||
|
@ -175,7 +175,7 @@ public class ActionModuleTests extends ESTestCase {
|
|||
UsageService usageService = new UsageService();
|
||||
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(),
|
||||
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
|
||||
singletonList(registersFakeHandler), null, null, usageService, null);
|
||||
singletonList(registersFakeHandler), null, null, usageService);
|
||||
actionModule.initRestHandlers(null);
|
||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () ->
|
||||
|
|
|
@ -21,11 +21,13 @@ package org.elasticsearch.common.compress;
|
|||
|
||||
import org.apache.lucene.util.LineFileDocs;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -388,9 +390,12 @@ public class DeflateCompressTests extends ESTestCase {
|
|||
StreamInput rawIn = new ByteBufferStreamInput(bb);
|
||||
Compressor c = compressor;
|
||||
|
||||
final Version version = VersionUtils.randomVersion(random());
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
OutputStreamStreamOutput rawOs = new OutputStreamStreamOutput(bos);
|
||||
rawOs.setVersion(version);
|
||||
StreamOutput os = c.streamOutput(rawOs);
|
||||
assertEquals(version, os.getVersion());
|
||||
|
||||
Random r = random();
|
||||
int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);
|
||||
|
@ -410,7 +415,9 @@ public class DeflateCompressTests extends ESTestCase {
|
|||
byte compressed[] = bos.toByteArray();
|
||||
ByteBuffer bb2 = ByteBuffer.wrap(compressed);
|
||||
StreamInput compressedIn = new ByteBufferStreamInput(bb2);
|
||||
compressedIn.setVersion(version);
|
||||
StreamInput in = c.streamInput(compressedIn);
|
||||
assertEquals(version, in.getVersion());
|
||||
|
||||
// randomize constants again
|
||||
bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);
|
||||
|
|
|
@ -29,9 +29,13 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class ThreadContextTests extends ESTestCase {
|
||||
|
@ -254,6 +258,7 @@ public class ThreadContextTests extends ESTestCase {
|
|||
threadContext.addResponseHeader("Warning", "123456");
|
||||
}
|
||||
threadContext.addResponseHeader("Warning", "234567");
|
||||
threadContext.disallowSystemIndexAccess();
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
threadContext.writeTo(out);
|
||||
|
@ -262,8 +267,9 @@ public class ThreadContextTests extends ESTestCase {
|
|||
assertNull(threadContext.getTransient("ctx.foo"));
|
||||
assertTrue(threadContext.getResponseHeaders().isEmpty());
|
||||
assertEquals("1", threadContext.getHeader("default"));
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
|
||||
threadContext.readHeaders(out.bytes().streamInput());
|
||||
threadContext.readFrom(out.bytes().streamInput());
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNull(threadContext.getTransient("ctx.foo"));
|
||||
|
||||
|
@ -274,10 +280,57 @@ public class ThreadContextTests extends ESTestCase {
|
|||
assertThat(warnings, hasSize(2));
|
||||
assertThat(warnings, hasItem(equalTo("123456")));
|
||||
assertThat(warnings, hasItem(equalTo("234567")));
|
||||
|
||||
assertFalse(threadContext.isSystemIndexAccessAllowed());
|
||||
}
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo"));
|
||||
assertEquals("1", threadContext.getHeader("default"));
|
||||
assertFalse(threadContext.isSystemIndexAccessAllowed());
|
||||
}
|
||||
|
||||
public void testSerializeWithAllowedSystemIndexPatterns() throws IOException {
|
||||
Settings build = Settings.builder().put("request.headers.default", "1").build();
|
||||
ThreadContext threadContext = new ThreadContext(build);
|
||||
threadContext.putHeader("foo", "bar");
|
||||
threadContext.putTransient("ctx.foo", 1);
|
||||
threadContext.addResponseHeader("Warning", "123456");
|
||||
if (rarely()) {
|
||||
threadContext.addResponseHeader("Warning", "123456");
|
||||
}
|
||||
threadContext.addResponseHeader("Warning", "234567");
|
||||
final List<String> allowed = randomList(1, 8, () -> randomAlphaOfLengthBetween(2, 8));
|
||||
threadContext.allowSystemIndexAccess(allowed);
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
threadContext.writeTo(out);
|
||||
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
|
||||
assertNull(threadContext.getHeader("foo"));
|
||||
assertNull(threadContext.getTransient("ctx.foo"));
|
||||
assertTrue(threadContext.getResponseHeaders().isEmpty());
|
||||
assertEquals("1", threadContext.getHeader("default"));
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
|
||||
threadContext.readFrom(out.bytes().streamInput());
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNull(threadContext.getTransient("ctx.foo"));
|
||||
|
||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||
final List<String> warnings = responseHeaders.get("Warning");
|
||||
|
||||
assertThat(responseHeaders.keySet(), hasSize(1));
|
||||
assertThat(warnings, hasSize(2));
|
||||
assertThat(warnings, hasItem(equalTo("123456")));
|
||||
assertThat(warnings, hasItem(equalTo("234567")));
|
||||
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
|
||||
}
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo"));
|
||||
assertEquals("1", threadContext.getHeader("default"));
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
|
||||
}
|
||||
|
||||
public void testSerializeInDifferentContext() throws IOException {
|
||||
|
@ -292,17 +345,19 @@ public class ThreadContextTests extends ESTestCase {
|
|||
threadContext.addResponseHeader("Warning", "123456");
|
||||
}
|
||||
threadContext.addResponseHeader("Warning", "234567");
|
||||
threadContext.disallowSystemIndexAccess();
|
||||
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNotNull(threadContext.getTransient("ctx.foo"));
|
||||
assertEquals("1", threadContext.getHeader("default"));
|
||||
assertThat(threadContext.getResponseHeaders().keySet(), hasSize(1));
|
||||
assertFalse(threadContext.isSystemIndexAccessAllowed());
|
||||
threadContext.writeTo(out);
|
||||
}
|
||||
{
|
||||
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
|
||||
ThreadContext otherThreadContext = new ThreadContext(otherSettings);
|
||||
otherThreadContext.readHeaders(out.bytes().streamInput());
|
||||
otherThreadContext.readFrom(out.bytes().streamInput());
|
||||
|
||||
assertEquals("bar", otherThreadContext.getHeader("foo"));
|
||||
assertNull(otherThreadContext.getTransient("ctx.foo"));
|
||||
|
@ -315,6 +370,53 @@ public class ThreadContextTests extends ESTestCase {
|
|||
assertThat(warnings, hasSize(2));
|
||||
assertThat(warnings, hasItem(equalTo("123456")));
|
||||
assertThat(warnings, hasItem(equalTo("234567")));
|
||||
|
||||
assertFalse(otherThreadContext.isSystemIndexAccessAllowed());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerializeInDifferentContextWithAllowedSystemIndices() throws IOException {
|
||||
final List<String> allowed = randomList(1, 8, () -> randomAlphaOfLengthBetween(2, 8));
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
{
|
||||
Settings build = Settings.builder().put("request.headers.default", "1").build();
|
||||
ThreadContext threadContext = new ThreadContext(build);
|
||||
threadContext.putHeader("foo", "bar");
|
||||
threadContext.putTransient("ctx.foo", 1);
|
||||
threadContext.addResponseHeader("Warning", "123456");
|
||||
if (rarely()) {
|
||||
threadContext.addResponseHeader("Warning", "123456");
|
||||
}
|
||||
threadContext.addResponseHeader("Warning", "234567");
|
||||
threadContext.allowSystemIndexAccess(allowed);
|
||||
|
||||
assertEquals("bar", threadContext.getHeader("foo"));
|
||||
assertNotNull(threadContext.getTransient("ctx.foo"));
|
||||
assertEquals("1", threadContext.getHeader("default"));
|
||||
assertThat(threadContext.getResponseHeaders().keySet(), hasSize(1));
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
|
||||
threadContext.writeTo(out);
|
||||
}
|
||||
{
|
||||
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
|
||||
ThreadContext otherThreadContext = new ThreadContext(otherSettings);
|
||||
otherThreadContext.readFrom(out.bytes().streamInput());
|
||||
|
||||
assertEquals("bar", otherThreadContext.getHeader("foo"));
|
||||
assertNull(otherThreadContext.getTransient("ctx.foo"));
|
||||
assertEquals("1", otherThreadContext.getHeader("default"));
|
||||
|
||||
final Map<String, List<String>> responseHeaders = otherThreadContext.getResponseHeaders();
|
||||
final List<String> warnings = responseHeaders.get("Warning");
|
||||
|
||||
assertThat(responseHeaders.keySet(), hasSize(1));
|
||||
assertThat(warnings, hasSize(2));
|
||||
assertThat(warnings, hasItem(equalTo("123456")));
|
||||
assertThat(warnings, hasItem(equalTo("234567")));
|
||||
|
||||
assertTrue(otherThreadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(otherThreadContext.allowedSystemIndexPatterns(), equalTo(allowed));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -333,7 +435,7 @@ public class ThreadContextTests extends ESTestCase {
|
|||
{
|
||||
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
|
||||
ThreadContext otherhreadContext = new ThreadContext(otherSettings);
|
||||
otherhreadContext.readHeaders(out.bytes().streamInput());
|
||||
otherhreadContext.readFrom(out.bytes().streamInput());
|
||||
|
||||
assertEquals("bar", otherhreadContext.getHeader("foo"));
|
||||
assertNull(otherhreadContext.getTransient("ctx.foo"));
|
||||
|
@ -613,6 +715,31 @@ public class ThreadContextTests extends ESTestCase {
|
|||
assertEquals("value for key [foo] already present", e.getMessage());
|
||||
}
|
||||
|
||||
public void testSystemIndexAccessAllowed() {
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), empty());
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
threadContext.disallowSystemIndexAccess();
|
||||
assertFalse(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), nullValue());
|
||||
}
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), empty());
|
||||
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
threadContext.disallowSystemIndexAccess();
|
||||
final List<String> allowed = randomList(1, 8, () -> randomAlphaOfLengthBetween(2, 8));
|
||||
threadContext.allowSystemIndexAccess(allowed);
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), not(sameInstance(allowed)));
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
|
||||
}
|
||||
assertTrue(threadContext.isSystemIndexAccessAllowed());
|
||||
assertThat(threadContext.allowedSystemIndexPatterns(), empty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sometimes wraps a Runnable in an AbstractRunnable.
|
||||
*/
|
||||
|
|
|
@ -90,7 +90,7 @@ public class RestControllerTests extends ESTestCase {
|
|||
inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
|
||||
|
||||
HttpServerTransport httpServerTransport = new TestHttpServerTransport();
|
||||
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService);
|
||||
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService, randomBoolean());
|
||||
restController.registerHandler(RestRequest.Method.GET, "/",
|
||||
(request, channel, client) -> channel.sendResponse(
|
||||
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
|
||||
|
@ -105,7 +105,7 @@ public class RestControllerTests extends ESTestCase {
|
|||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
|
||||
new RestHeaderDefinition("header.2", true)));
|
||||
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
|
||||
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService, randomBoolean());
|
||||
Map<String, List<String>> restHeaders = new HashMap<>();
|
||||
restHeaders.put("header.1", Collections.singletonList("true"));
|
||||
restHeaders.put("header.2", Collections.singletonList("true"));
|
||||
|
@ -141,7 +141,7 @@ public class RestControllerTests extends ESTestCase {
|
|||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
|
||||
new RestHeaderDefinition("header.2", false)));
|
||||
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
|
||||
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService, randomBoolean());
|
||||
Map<String, List<String>> restHeaders = new HashMap<>();
|
||||
restHeaders.put("header.1", Collections.singletonList("boo"));
|
||||
restHeaders.put("header.2", Arrays.asList("foo", "bar"));
|
||||
|
@ -155,7 +155,7 @@ public class RestControllerTests extends ESTestCase {
|
|||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
|
||||
new RestHeaderDefinition("header.2", false)));
|
||||
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
|
||||
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService, randomBoolean());
|
||||
Map<String, List<String>> restHeaders = new HashMap<>();
|
||||
restHeaders.put("header.1", Collections.singletonList("boo"));
|
||||
restHeaders.put("header.2", Arrays.asList("foo", "foo"));
|
||||
|
@ -209,7 +209,7 @@ public class RestControllerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRegisterSecondMethodWithDifferentNamedWildcard() {
|
||||
final RestController restController = new RestController(null, null, null, circuitBreakerService, usageService);
|
||||
final RestController restController = new RestController(null, null, null, circuitBreakerService, usageService, randomBoolean());
|
||||
|
||||
RestRequest.Method firstMethod = randomFrom(RestRequest.Method.values());
|
||||
RestRequest.Method secondMethod =
|
||||
|
@ -236,7 +236,7 @@ public class RestControllerTests extends ESTestCase {
|
|||
h -> {
|
||||
assertSame(handler, h);
|
||||
return (RestRequest request, RestChannel channel, NodeClient client) -> wrapperCalled.set(true);
|
||||
}, null, circuitBreakerService, usageService);
|
||||
}, null, circuitBreakerService, usageService, randomBoolean());
|
||||
restController.registerHandler(RestRequest.Method.GET, "/wrapped", handler);
|
||||
RestRequest request = testRestRequest("/wrapped", "{}", XContentType.JSON);
|
||||
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
|
||||
|
@ -299,7 +299,7 @@ public class RestControllerTests extends ESTestCase {
|
|||
String content = randomAlphaOfLength((int) Math.round(BREAKER_LIMIT.getBytes() / inFlightRequestsBreaker.getOverhead()));
|
||||
RestRequest request = testRestRequest("/", content, null);
|
||||
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.NOT_ACCEPTABLE);
|
||||
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService);
|
||||
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService, randomBoolean());
|
||||
restController.registerHandler(RestRequest.Method.GET, "/",
|
||||
(r, c, client) -> c.sendResponse(
|
||||
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
|
||||
|
@ -570,6 +570,25 @@ public class RestControllerTests extends ESTestCase {
|
|||
assertThat(channel.getRestResponse().getHeaders().get("Allow"), hasItem(equalTo(RestRequest.Method.GET.toString())));
|
||||
}
|
||||
|
||||
public void testDispatchRestrictSystemIndices() {
|
||||
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService, true);
|
||||
restController.registerHandler(RestRequest.Method.GET, "/",
|
||||
(request, channel, client) -> channel.sendResponse(
|
||||
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
|
||||
restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> {
|
||||
throw new IllegalArgumentException("test error");
|
||||
});
|
||||
|
||||
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).build();
|
||||
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
|
||||
|
||||
assertFalse(channel.getSendResponseCalled());
|
||||
ThreadContext context = new ThreadContext(Settings.EMPTY);
|
||||
assertTrue(context.isSystemIndexAccessAllowed());
|
||||
restController.dispatchRequest(fakeRestRequest, channel, context);
|
||||
assertTrue(channel.getSendResponseCalled());
|
||||
assertFalse(context.isSystemIndexAccessAllowed());
|
||||
}
|
||||
|
||||
private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
|
||||
HttpServerTransport {
|
||||
|
|
|
@ -89,7 +89,7 @@ public class RestHttpResponseHeadersTests extends ESTestCase {
|
|||
final Settings settings = Settings.EMPTY;
|
||||
UsageService usageService = new UsageService();
|
||||
RestController restController = new RestController(Collections.emptySet(),
|
||||
null, null, circuitBreakerService, usageService);
|
||||
null, null, circuitBreakerService, usageService, randomBoolean());
|
||||
|
||||
// A basic RestHandler handles requests to the endpoint
|
||||
RestHandler restHandler = new RestHandler() {
|
||||
|
|
|
@ -60,7 +60,7 @@ public class RestValidateQueryActionTests extends AbstractSearchTestCase {
|
|||
|
||||
private static UsageService usageService = new UsageService();
|
||||
private static RestController controller = new RestController(emptySet(), null, client,
|
||||
new NoneCircuitBreakerService(), usageService);
|
||||
new NoneCircuitBreakerService(), usageService, false);
|
||||
private static RestValidateQueryAction action = new RestValidateQueryAction();
|
||||
|
||||
/**
|
||||
|
|
|
@ -127,7 +127,8 @@ public class RestIndicesActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
final RestController restController = new RestController(Collections.emptySet(), null, null, null, new UsageService());
|
||||
final RestController restController =
|
||||
new RestController(Collections.emptySet(), null, null, null, new UsageService(), randomBoolean());
|
||||
final RestIndicesAction action = new RestIndicesAction();
|
||||
restController.registerHandler(action);
|
||||
final Table table = action.buildTable(new FakeRestRequest(), indicesSettings, indicesHealths, indicesStats, indicesMetaDatas);
|
||||
|
|
|
@ -55,7 +55,7 @@ public class RestRecoveryActionTests extends ESTestCase {
|
|||
public void testRestRecoveryAction() {
|
||||
final Settings settings = Settings.EMPTY;
|
||||
UsageService usageService = new UsageService();
|
||||
final RestController restController = new RestController(Collections.emptySet(), null, null, null, usageService);
|
||||
final RestController restController = new RestController(Collections.emptySet(), null, null, null, usageService, randomBoolean());
|
||||
final RestCatRecoveryAction action = new RestCatRecoveryAction();
|
||||
restController.registerHandler(action);
|
||||
final int totalShards = randomIntBetween(1, 32);
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
|
@ -43,9 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class RestIndexActionTests extends RestActionTestCase {
|
||||
|
||||
|
@ -53,11 +50,9 @@ public class RestIndexActionTests extends RestActionTestCase {
|
|||
|
||||
@Before
|
||||
public void setUpAction() {
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
when(clusterService.state()).thenAnswer(invocationOnMock -> clusterStateSupplier.get());
|
||||
controller().registerHandler(new RestIndexAction());
|
||||
controller().registerHandler(new CreateHandler());
|
||||
controller().registerHandler(new AutoIdHandler(clusterService));
|
||||
controller().registerHandler(new AutoIdHandler(() -> clusterStateSupplier.get().nodes()));
|
||||
}
|
||||
|
||||
public void testTypeInPath() {
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.BytesStream;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
@ -33,7 +34,11 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
|
|||
|
||||
public void testStreamWithoutCompression() throws IOException {
|
||||
BytesStream bStream = new ZeroOutOnCloseStream();
|
||||
if (randomBoolean()) {
|
||||
bStream.setVersion(VersionUtils.randomVersion(random()));
|
||||
}
|
||||
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, false);
|
||||
assertEquals(bStream.getVersion(), stream.getVersion());
|
||||
|
||||
byte[] expectedBytes = randomBytes(randomInt(30));
|
||||
stream.write(expectedBytes);
|
||||
|
@ -61,7 +66,11 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
|
|||
|
||||
public void testStreamWithCompression() throws IOException {
|
||||
BytesStream bStream = new ZeroOutOnCloseStream();
|
||||
if (randomBoolean()) {
|
||||
bStream.setVersion(VersionUtils.randomVersion(random()));
|
||||
}
|
||||
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);
|
||||
assertEquals(bStream.getVersion(), stream.getVersion());
|
||||
|
||||
byte[] expectedBytes = randomBytes(randomInt(30));
|
||||
stream.write(expectedBytes);
|
||||
|
@ -88,7 +97,11 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
|
|||
|
||||
public void testCompressionWithCallingMaterializeFails() throws IOException {
|
||||
BytesStream bStream = new ZeroOutOnCloseStream();
|
||||
if (randomBoolean()) {
|
||||
bStream.setVersion(VersionUtils.randomVersion(random()));
|
||||
}
|
||||
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);
|
||||
assertEquals(bStream.getVersion(), stream.getVersion());
|
||||
|
||||
byte[] expectedBytes = randomBytes(between(1, 30));
|
||||
stream.write(expectedBytes);
|
||||
|
|
|
@ -47,7 +47,7 @@ public abstract class RestActionTestCase extends ESTestCase {
|
|||
controller = new RestController(Collections.emptySet(), null,
|
||||
nodeClient,
|
||||
new NoneCircuitBreakerService(),
|
||||
new UsageService());
|
||||
new UsageService(), randomBoolean());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -263,7 +263,7 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.singletonList(
|
||||
new SystemIndexDescriptor(ENRICH_INDEX_PATTERN, "Contains data to support enrich ingest processors.")
|
||||
);
|
||||
|
|
|
@ -72,7 +72,7 @@ public class Logstash extends Plugin implements SystemIndexPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.singletonList(new SystemIndexDescriptor(LOGSTASH_CONCRETE_INDEX_NAME,
|
||||
"Contains data for Logstash Central Management"));
|
||||
}
|
||||
|
|
|
@ -945,7 +945,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.unmodifiableList(Arrays.asList(
|
||||
new SystemIndexDescriptor(MlMetaIndex.INDEX_NAME, "Contains scheduling and anomaly tracking metadata"),
|
||||
new SystemIndexDescriptor(AnomalyDetectorsIndexFields.CONFIG_INDEX, "Contains ML configuration data"),
|
||||
|
|
|
@ -1114,7 +1114,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.unmodifiableList(Arrays.asList(
|
||||
new SystemIndexDescriptor(SECURITY_MAIN_ALIAS, "Contains Security configuration"),
|
||||
new SystemIndexDescriptor(RestrictedIndicesNames.INTERNAL_SECURITY_MAIN_INDEX_6, "Contains Security configuration"),
|
||||
|
|
|
@ -713,7 +713,7 @@ public class AuthenticationServiceTests extends ESTestCase {
|
|||
threadContext2.writeTo(output);
|
||||
StreamInput input = output.bytes().streamInput();
|
||||
threadContext2 = new ThreadContext(Settings.EMPTY);
|
||||
threadContext2.readHeaders(input);
|
||||
threadContext2.readFrom(input);
|
||||
header = threadContext2.getHeader(AuthenticationField.AUTHENTICATION_KEY);
|
||||
}
|
||||
|
||||
|
|
|
@ -383,7 +383,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.singletonList(
|
||||
new SystemIndexDescriptor(TransformInternalIndexConstants.INDEX_NAME_PATTERN, "Contains Transform configuration data")
|
||||
);
|
||||
|
|
|
@ -702,7 +702,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
|
||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
||||
return Collections.unmodifiableList(Arrays.asList(
|
||||
new SystemIndexDescriptor(Watch.INDEX, "Contains Watch definitions"),
|
||||
new SystemIndexDescriptor(TriggeredWatchStoreField.INDEX_NAME, "Used to track current and queued Watch execution")
|
||||
|
|
Loading…
Reference in New Issue