Revert "Introduce system index APIs for Kibana (#53035)" (#53992)

This reverts commit c610e0893d.

backport of #53912
This commit is contained in:
Ryan Ernst 2020-03-23 10:29:35 -07:00 committed by GitHub
parent c4260ba3c7
commit 960d1fb578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 113 additions and 949 deletions

View File

@ -1,31 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
esplugin {
description 'Plugin exposing APIs for Kibana system indices'
classname 'org.elasticsearch.kibana.KibanaPlugin'
}
dependencies {
compile project(path: ':modules:reindex', configuration: 'runtime')
}
testClusters.integTest {
module file(project(':modules:reindex').tasks.bundlePlugin.archiveFile)
}

View File

@ -1,148 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.kibana;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.index.reindex.RestDeleteByQueryAction;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.admin.indices.RestCreateIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestGetAliasesAction;
import org.elasticsearch.rest.action.admin.indices.RestGetIndicesAction;
import org.elasticsearch.rest.action.admin.indices.RestIndexPutAliasAction;
import org.elasticsearch.rest.action.admin.indices.RestRefreshAction;
import org.elasticsearch.rest.action.admin.indices.RestUpdateSettingsAction;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestDeleteAction;
import org.elasticsearch.rest.action.document.RestGetAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestIndexAction.AutoIdHandler;
import org.elasticsearch.rest.action.document.RestIndexAction.CreateHandler;
import org.elasticsearch.rest.action.document.RestMultiGetAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
import org.elasticsearch.rest.action.search.RestClearScrollAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class KibanaPlugin extends Plugin implements SystemIndexPlugin {
public static final Setting<List<String>> KIBANA_INDEX_NAMES_SETTING = Setting.listSetting("kibana.system_indices",
Collections.unmodifiableList(Arrays.asList(".kibana*", ".reporting")), Function.identity(), Property.NodeScope);
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.unmodifiableList(KIBANA_INDEX_NAMES_SETTING.get(settings).stream()
.map(pattern -> new SystemIndexDescriptor(pattern, "System index used by kibana"))
.collect(Collectors.toList()));
}
@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
// TODO need to figure out what subset of system indices Kibana should have access to via these APIs
final List<String> allowedIndexPatterns = Collections.emptyList();
return Collections.unmodifiableList(Arrays.asList(
// Based on https://github.com/elastic/kibana/issues/49764
// apis needed to perform migrations... ideally these will go away
new KibanaWrappedRestHandler(new RestCreateIndexAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestGetAliasesAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestIndexPutAliasAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestRefreshAction(), allowedIndexPatterns),
// apis needed to access saved objects
new KibanaWrappedRestHandler(new RestGetAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestMultiGetAction(settings), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestSearchAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestBulkAction(settings), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestDeleteAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestDeleteByQueryAction(), allowedIndexPatterns),
// api used for testing
new KibanaWrappedRestHandler(new RestUpdateSettingsAction(), allowedIndexPatterns),
// apis used specifically by reporting
new KibanaWrappedRestHandler(new RestGetIndicesAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestIndexAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new CreateHandler(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new AutoIdHandler(nodesInCluster), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestUpdateAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestSearchScrollAction(), allowedIndexPatterns),
new KibanaWrappedRestHandler(new RestClearScrollAction(), allowedIndexPatterns)
));
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(KIBANA_INDEX_NAMES_SETTING);
}
static class KibanaWrappedRestHandler extends BaseRestHandler.Wrapper {
private final List<String> allowedIndexPatterns;
KibanaWrappedRestHandler(BaseRestHandler delegate, List<String> allowedIndexPatterns) {
super(delegate);
this.allowedIndexPatterns = allowedIndexPatterns;
}
@Override
public String getName() {
return "kibana_" + super.getName();
}
@Override
public List<Route> routes() {
return Collections.unmodifiableList(super.routes().stream()
.map(route -> new Route(route.getMethod(), "/_kibana" + route.getPath()))
.collect(Collectors.toList()));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
client.threadPool().getThreadContext().allowSystemIndexAccess(allowedIndexPatterns);
return super.prepareRequest(request, client);
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.kibana;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
public class KibanaPluginTests extends ESTestCase {
public void testKibanaIndexNames() {
assertThat(new KibanaPlugin().getSettings(), contains(KibanaPlugin.KIBANA_INDEX_NAMES_SETTING));
assertThat(new KibanaPlugin().getSystemIndexDescriptors(Settings.EMPTY).stream()
.map(SystemIndexDescriptor::getIndexPattern).collect(Collectors.toList()),
contains(".kibana*", ".reporting"));
final List<String> names = Arrays.asList("." + randomAlphaOfLength(4), "." + randomAlphaOfLength(6));
final List<String> namesFromDescriptors = new KibanaPlugin().getSystemIndexDescriptors(
Settings.builder().putList(KibanaPlugin.KIBANA_INDEX_NAMES_SETTING.getKey(), names).build()
).stream().map(SystemIndexDescriptor::getIndexPattern).collect(Collectors.toList());
assertThat(namesFromDescriptors, is(names));
}
}

View File

@ -1,249 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.kibana;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
public class KibanaSystemIndexIT extends ESRestTestCase {
public void testCreateIndex() throws IOException {
Request request = new Request("PUT", "/_kibana/.kibana-1");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
}
public void testAliases() throws IOException {
Request request = new Request("PUT", "/_kibana/.kibana-1");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
request = new Request("PUT", "/_kibana/.kibana-1/_alias/.kibana");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
request = new Request("GET", "/_kibana/_aliases");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
assertThat(EntityUtils.toString(response.getEntity()), containsString(".kibana"));
}
public void testBulkToKibanaIndex() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
}
public void testRefresh() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
request = new Request("GET", "/_kibana/.kibana/_refresh");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Request getRequest = new Request("GET", "/_kibana/.kibana/_doc/1");
Response getResponse = client().performRequest(getRequest);
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
String responseBody = EntityUtils.toString(getResponse.getEntity());
assertThat(responseBody, containsString("foo"));
assertThat(responseBody, containsString("bar"));
}
public void testGetFromKibanaIndex() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n");
request.addParameter("refresh", "true");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Request getRequest = new Request("GET", "/_kibana/.kibana/_doc/1");
Response getResponse = client().performRequest(getRequest);
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
String responseBody = EntityUtils.toString(getResponse.getEntity());
assertThat(responseBody, containsString("foo"));
assertThat(responseBody, containsString("bar"));
}
public void testMultiGetFromKibanaIndex() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
request.addParameter("refresh", "true");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Request getRequest = new Request("GET", "/_kibana/_mget");
getRequest.setJsonEntity("{ \"docs\" : [ { \"_index\" : \".kibana\", \"_id\" : \"1\" }, " +
"{ \"_index\" : \".kibana\", \"_id\" : \"2\" } ] }\n");
Response getResponse = client().performRequest(getRequest);
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
String responseBody = EntityUtils.toString(getResponse.getEntity());
assertThat(responseBody, containsString("foo"));
assertThat(responseBody, containsString("bar"));
assertThat(responseBody, containsString("baz"));
assertThat(responseBody, containsString("tag"));
}
public void testSearchFromKibanaIndex() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
request.addParameter("refresh", "true");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Request searchRequest = new Request("GET", "/_kibana/.kibana/_search");
searchRequest.setJsonEntity("{ \"query\" : { \"match_all\" : {} } }\n");
Response getResponse = client().performRequest(searchRequest);
assertThat(getResponse.getStatusLine().getStatusCode(), is(200));
String responseBody = EntityUtils.toString(getResponse.getEntity());
assertThat(responseBody, containsString("foo"));
assertThat(responseBody, containsString("bar"));
assertThat(responseBody, containsString("baz"));
assertThat(responseBody, containsString("tag"));
}
public void testDeleteFromKibanaIndex() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
request.addParameter("refresh", "true");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Request deleteRequest = new Request("DELETE", "/_kibana/.kibana/_doc/1");
Response deleteResponse = client().performRequest(deleteRequest);
assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
}
public void testDeleteByQueryFromKibanaIndex() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n");
request.addParameter("refresh", "true");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Request dbqRequest = new Request("POST", "/_kibana/.kibana/_delete_by_query");
dbqRequest.setJsonEntity("{ \"query\" : { \"match_all\" : {} } }\n");
Response dbqResponse = client().performRequest(dbqRequest);
assertThat(dbqResponse.getStatusLine().getStatusCode(), is(200));
}
public void testUpdateIndexSettings() throws IOException {
Request request = new Request("PUT", "/_kibana/.kibana-1");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
request = new Request("PUT", "/_kibana/.kibana-1/_settings");
request.setJsonEntity("{ \"index.blocks.read_only\" : false }");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
}
public void testGetIndex() throws IOException {
Request request = new Request("PUT", "/_kibana/.kibana-1");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
request = new Request("GET", "/_kibana/.kibana-1");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
assertThat(EntityUtils.toString(response.getEntity()), containsString(".kibana-1"));
}
public void testIndexingAndUpdatingDocs() throws IOException {
Request request = new Request("PUT", "/_kibana/.kibana-1/_doc/1");
request.setJsonEntity("{ \"foo\" : \"bar\" }");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(201));
request = new Request("PUT", "/_kibana/.kibana-1/_create/2");
request.setJsonEntity("{ \"foo\" : \"bar\" }");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(201));
request = new Request("POST", "/_kibana/.kibana-1/_doc");
request.setJsonEntity("{ \"foo\" : \"bar\" }");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(201));
request = new Request("GET", "/_kibana/.kibana-1/_refresh");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
request = new Request("POST", "/_kibana/.kibana-1/_update/1");
request.setJsonEntity("{ \"doc\" : { \"foo\" : \"baz\" } }");
response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
}
public void testScrollingDocs() throws IOException {
Request request = new Request("POST", "/_kibana/_bulk");
request.setJsonEntity("{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"1\" } }\n{ \"foo\" : \"bar\" }\n" +
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"2\" } }\n{ \"baz\" : \"tag\" }\n" +
"{ \"index\" : { \"_index\" : \".kibana\", \"_id\" : \"3\" } }\n{ \"baz\" : \"tag\" }\n");
request.addParameter("refresh", "true");
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Request searchRequest = new Request("GET", "/_kibana/.kibana/_search");
searchRequest.setJsonEntity("{ \"size\" : 1,\n\"query\" : { \"match_all\" : {} } }\n");
searchRequest.addParameter("scroll", "1m");
response = client().performRequest(searchRequest);
assertThat(response.getStatusLine().getStatusCode(), is(200));
Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
assertNotNull(map.get("_scroll_id"));
String scrollId = (String) map.get("_scroll_id");
Request scrollRequest = new Request("POST", "/_kibana/_search/scroll");
scrollRequest.addParameter("scroll_id", scrollId);
scrollRequest.addParameter("scroll", "1m");
response = client().performRequest(scrollRequest);
assertThat(response.getStatusLine().getStatusCode(), is(200));
map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
assertNotNull(map.get("_scroll_id"));
scrollId = (String) map.get("_scroll_id");
Request clearScrollRequest = new Request("DELETE", "/_kibana/_search/scroll");
clearScrollRequest.addParameter("scroll_id", scrollId);
response = client().performRequest(clearScrollRequest);
assertThat(response.getStatusLine().getStatusCode(), is(200));
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.tasksplugin;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
@ -35,7 +34,7 @@ import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX;
public class TasksPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.singletonList(new SystemIndexDescriptor(TASK_INDEX, this.getClass().getSimpleName()));
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.tasksplugin;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
@ -28,6 +27,6 @@ public class TasksPluginTests extends ESTestCase {
public void testDummy() {
// This is a dummy test case to satisfy the conventions
TasksPlugin plugin = new TasksPlugin();
assertThat(plugin.getSystemIndexDescriptors(Settings.EMPTY), Matchers.hasSize(1));
assertThat(plugin.getSystemIndexDescriptors(), Matchers.hasSize(1));
}
}

View File

@ -220,6 +220,7 @@ import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.TypeLiteral;
@ -425,11 +426,12 @@ public class ActionModule extends AbstractModule {
private final RestController restController;
private final RequestValidators<PutMappingRequest> mappingRequestValidators;
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ClusterService clusterService;
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
CircuitBreakerService circuitBreakerService, UsageService usageService) {
CircuitBreakerService circuitBreakerService, UsageService usageService, ClusterService clusterService) {
this.transportClient = transportClient;
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
@ -437,6 +439,7 @@ public class ActionModule extends AbstractModule {
this.clusterSettings = clusterSettings;
this.settingsFilter = settingsFilter;
this.actionPlugins = actionPlugins;
this.clusterService = clusterService;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
@ -464,12 +467,11 @@ public class ActionModule extends AbstractModule {
if (transportClient) {
restController = null;
} else {
final boolean restrictSystemIndices = RestController.RESTRICT_SYSTEM_INDICES.get(settings);
restController =
new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, restrictSystemIndices);
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
}
}
public Map<String, ActionHandler<?, ?>> getActions() {
return actions;
}
@ -705,7 +707,7 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler(nodesInCluster));
registerHandler.accept(new AutoIdHandler(clusterService));
registerHandler.accept(new RestGetAction());
registerHandler.accept(new RestGetSourceAction());
registerHandler.accept(new RestMultiGetAction(settings));

View File

@ -187,7 +187,7 @@ public abstract class TransportClient extends AbstractClient {
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
pluginsService.filterPlugins(ActionPlugin.class), null, null, null);
pluginsService.filterPlugins(ActionPlugin.class), null, null, null, null);
modules.add(actionModule);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),

View File

@ -366,8 +366,8 @@ public class PublicationTransportHandler {
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
bStream.setVersion(nodeVersion);
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
}
@ -376,8 +376,8 @@ public class PublicationTransportHandler {
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
bStream.setVersion(nodeVersion);
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
}
@ -387,12 +387,12 @@ public class PublicationTransportHandler {
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
in.setVersion(request.version());
try {
if (compressor != null) {
in = compressor.streamInput(in);
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setVersion(request.version());
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
final ClusterState incomingState;

View File

@ -85,7 +85,7 @@ public class DeflateCompressor implements Compressor {
final Inflater inflater = new Inflater(nowrap);
InputStream decompressedIn = new InflaterInputStream(in, inflater, BUFFER_SIZE);
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
final InputStreamStreamInput inputStreamStreamInput = new InputStreamStreamInput(decompressedIn) {
return new InputStreamStreamInput(decompressedIn) {
final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException {
@ -99,9 +99,6 @@ public class DeflateCompressor implements Compressor {
}
}
};
inputStreamStreamInput.setVersion(in.getVersion());
return inputStreamStreamInput;
}
@Override
@ -112,7 +109,7 @@ public class DeflateCompressor implements Compressor {
final boolean syncFlush = true;
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
final OutputStreamStreamOutput outputStreamStreamOutput = new OutputStreamStreamOutput(compressedOut) {
return new OutputStreamStreamOutput(compressedOut) {
final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException {
@ -126,7 +123,5 @@ public class DeflateCompressor implements Compressor {
}
}
};
outputStreamStreamOutput.setVersion(out.getVersion());
return outputStreamStreamOutput;
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.io;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -297,15 +296,5 @@ public abstract class Streams {
public BytesReference bytes() {
return delegate.bytes();
}
@Override
public Version getVersion() {
return delegate.getVersion();
}
@Override
public void setVersion(Version version) {
delegate.setVersion(version);
}
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.settings;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Build;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.TransportSearchAction;
@ -105,7 +104,6 @@ import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
@ -191,10 +189,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
}
}
public static final Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS;
static {
final Set<Setting<?>> settings = new HashSet<>(Arrays.asList(
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL,
TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT,
@ -544,16 +539,11 @@ public final class ClusterSettings extends AbstractScopedSettings {
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING));
if (Build.CURRENT.isSnapshot()) {
settings.add(RestController.RESTRICT_SYSTEM_INDICES);
}
BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(settings);
}
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING)));
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
}

View File

@ -20,11 +20,9 @@ package org.elasticsearch.common.util.concurrent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -52,7 +50,6 @@ import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
@ -67,7 +64,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARN
* Consumers of ThreadContext usually don't need to interact with adding or stashing contexts. Every elasticsearch thread is managed by
* a thread pool or executor being responsible for stashing and restoring the threads context. For instance if a network request is
* received, all headers are deserialized from the network and directly added as the headers of the threads {@link ThreadContext}
* (see {@link #readFrom(StreamInput)}. In order to not modify the context that is currently active on this thread the network code
* (see {@link #readHeaders(StreamInput)}. In order to not modify the context that is currently active on this thread the network code
* uses a try/with pattern to stash it's current context, read headers into a fresh one and once the request is handled or a handler thread
* is forked (which in turn inherits the context) it restores the previous context. For instance:
* </p>
@ -237,18 +234,17 @@ public final class ThreadContext implements Writeable {
}
/**
* Reads the values from the stream into the current context
* Reads the headers from the stream into the current context
*/
public void readFrom(StreamInput in) throws IOException {
public void readHeaders(StreamInput in) throws IOException {
final Tuple<Map<String, String>, Map<String, Set<String>>> streamTuple = readHeadersFromStream(in);
final Map<String, String> requestHeaders = streamTuple.v1();
final Map<String, Set<String>> responseHeaders = streamTuple.v2();
final List<String> allowedSystemIndices = readAllowedSystemIndices(in);
final ThreadContextStruct struct;
if (requestHeaders.isEmpty() && responseHeaders.isEmpty() && allowedSystemIndices.isEmpty()) {
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
struct = ThreadContextStruct.EMPTY;
} else {
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), allowedSystemIndices, false, 0L);
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false);
}
threadLocal.set(struct);
}
@ -275,14 +271,6 @@ public final class ThreadContext implements Writeable {
return new Tuple<>(requestHeaders, responseHeaders);
}
public static List<String> readAllowedSystemIndices(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
return in.readOptionalStringList();
} else {
return emptyList();
}
}
/**
* Returns the header for the given key or <code>null</code> if not present
*/
@ -426,36 +414,6 @@ public final class ThreadContext implements Writeable {
return threadLocal.get().isSystemContext;
}
/**
* Returns <code>true</code> if a request made within this context can access system indices
*/
public boolean isSystemIndexAccessAllowed() {
return threadLocal.get().allowedSystemIndexPatterns != null;
}
/**
* Sets the context to disallow access to system indices
*/
public void disallowSystemIndexAccess() {
threadLocal.set(threadLocal.get().setAllowSystemIndices(null));
}
/**
* Sets the context to allow access to system indices
*/
public void allowSystemIndexAccess(List<String> patterns) {
threadLocal.set(threadLocal.get().setAllowSystemIndices(patterns));
}
/**
* Returns the list of allowed system index patterns or {@code null} if none are allowed. An
* empty list indicates that all system indices are allowed to be accessed.
*/
@Nullable
public List<String> allowedSystemIndexPatterns() {
return threadLocal.get().allowedSystemIndexPatterns;
}
@FunctionalInterface
public interface StoredContext extends AutoCloseable {
@Override
@ -487,7 +445,6 @@ public final class ThreadContext implements Writeable {
private final Map<String, String> requestHeaders;
private final Map<String, Object> transientHeaders;
private final Map<String, Set<String>> responseHeaders;
private final List<String> allowedSystemIndexPatterns;
private final boolean isSystemContext;
//saving current warning headers' size not to recalculate the size with every new warning header
private final long warningHeadersSize;
@ -502,41 +459,29 @@ public final class ThreadContext implements Writeable {
private ThreadContextStruct(Map<String, String> requestHeaders,
Map<String, Set<String>> responseHeaders,
Map<String, Object> transientHeaders, boolean isSystemContext) {
this(requestHeaders, responseHeaders, transientHeaders, emptyList(), isSystemContext, 0L);
this.requestHeaders = requestHeaders;
this.responseHeaders = responseHeaders;
this.transientHeaders = transientHeaders;
this.isSystemContext = isSystemContext;
this.warningHeadersSize = 0L;
}
private ThreadContextStruct(Map<String, String> requestHeaders,
Map<String, Set<String>> responseHeaders,
Map<String, Object> transientHeaders, boolean isSystemContext,
long warningHeadersSize) {
this(requestHeaders, responseHeaders, transientHeaders, emptyList(), isSystemContext, warningHeadersSize);
}
private ThreadContextStruct(Map<String, String> requestHeaders,
Map<String, Set<String>> responseHeaders,
Map<String, Object> transientHeaders,
List<String> allowedSystemIndexPatterns,
boolean isSystemContext,
long warningHeadersSize) {
this.requestHeaders = requestHeaders;
this.responseHeaders = responseHeaders;
this.transientHeaders = transientHeaders;
this.isSystemContext = isSystemContext;
this.warningHeadersSize = warningHeadersSize;
this.allowedSystemIndexPatterns = allowedSystemIndexPatterns;
}
/**
* This represents the default context and it should only ever be called by {@link #DEFAULT_CONTEXT}.
*/
private ThreadContextStruct() {
this(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), emptyList(), false, 0L);
}
private ThreadContextStruct setAllowSystemIndices(List<String> allowedSystemIndexPatterns) {
final List<String> copy =
allowedSystemIndexPatterns == null ? null : Collections.unmodifiableList(new ArrayList<>(allowedSystemIndexPatterns));
return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, copy, isSystemContext, warningHeadersSize);
this(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), false);
}
private ThreadContextStruct putRequest(String key, String value) {
@ -580,8 +525,7 @@ public final class ThreadContext implements Writeable {
newResponseHeaders.put(key, entry.getValue());
}
}
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, allowedSystemIndexPatterns,
isSystemContext, 0L);
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
}
private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue,
@ -631,8 +575,7 @@ public final class ThreadContext implements Writeable {
return this;
}
}
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, allowedSystemIndexPatterns,
isSystemContext, newWarningHeaderSize);
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
}
@ -668,9 +611,6 @@ public final class ThreadContext implements Writeable {
}
out.writeMap(responseHeaders, StreamOutput::writeString, StreamOutput::writeStringCollection);
if (out.getVersion().onOrAfter(Version.V_7_7_0)) { // TODO update version on backport
out.writeOptionalStringCollection(allowedSystemIndexPatterns);
}
}
}
@ -688,7 +628,7 @@ public final class ThreadContext implements Writeable {
@Override
public void run() {
try (ThreadContext.StoredContext ignore = stashContext()) {
try (ThreadContext.StoredContext ignore = stashContext()){
ctx.restore();
in.run();
}

View File

@ -443,7 +443,7 @@ public class Node implements Closeable {
.stream()
.collect(Collectors.toMap(
plugin -> plugin.getClass().getSimpleName(),
plugin -> plugin.getSystemIndexDescriptors(settings))));
plugin -> plugin.getSystemIndexDescriptors())));
SystemIndexDescriptor.checkForOverlappingPatterns(systemIndexDescriptorMap);
final List<SystemIndexDescriptor> systemIndexDescriptors = systemIndexDescriptorMap.values().stream()
@ -479,7 +479,7 @@ public class Node implements Closeable {
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, clusterService);
modules.add(actionModule);
final RestController restController = actionModule.getRestController();

View File

@ -19,7 +19,6 @@
package org.elasticsearch.plugins;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import java.util.Collection;
@ -34,10 +33,9 @@ public interface SystemIndexPlugin extends ActionPlugin {
/**
* Returns a {@link Collection} of {@link SystemIndexDescriptor}s that describe this plugin's system indices, including
* name, mapping, and settings.
* @param settings The node's settings
* @return Descriptions of the system indices managed by this plugin.
*/
default Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
default Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.emptyList();
}
}

View File

@ -197,57 +197,4 @@ public abstract class BaseRestHandler implements RestHandler {
return Collections.emptySet();
}
public static class Wrapper extends BaseRestHandler {
protected final BaseRestHandler delegate;
public Wrapper(BaseRestHandler delegate) {
this.delegate = delegate;
}
@Override
public String getName() {
return delegate.getName();
}
@Override
public List<Route> routes() {
return delegate.routes();
}
@Override
public List<DeprecatedRoute> deprecatedRoutes() {
return delegate.deprecatedRoutes();
}
@Override
public List<ReplacedRoute> replacedRoutes() {
return delegate.replacedRoutes();
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
return delegate.prepareRequest(request, client);
}
@Override
protected Set<String> responseParams() {
return delegate.responseParams();
}
@Override
public boolean canTripCircuitBreaker() {
return delegate.canTripCircuitBreaker();
}
@Override
public boolean supportsContentStream() {
return delegate.supportsContentStream();
}
@Override
public boolean allowsUnsafeBuffers() {
return delegate.allowsUnsafeBuffers();
}
}
}

View File

@ -31,8 +31,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
@ -68,10 +66,6 @@ public class RestController implements HttpServerTransport.Dispatcher {
private static final Logger logger = LogManager.getLogger(RestController.class);
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
// TODO once we are ready, this should default to true
public static final Setting<Boolean> RESTRICT_SYSTEM_INDICES =
Setting.boolSetting("rest.restrict_system_indices", false, Property.NodeScope);
private final PathTrie<MethodHandlers> handlers = new PathTrie<>(RestUtils.REST_DECODER);
private final UnaryOperator<RestHandler> handlerWrapper;
@ -83,10 +77,9 @@ public class RestController implements HttpServerTransport.Dispatcher {
/** Rest headers that are copied to internal requests made during a rest request. */
private final Set<RestHeaderDefinition> headersToCopy;
private final UsageService usageService;
private final boolean restrictSystemIndices;
public RestController(Set<RestHeaderDefinition> headersToCopy, UnaryOperator<RestHandler> handlerWrapper,
NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService, boolean restrictSystemIndices) {
NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService) {
this.headersToCopy = headersToCopy;
this.usageService = usageService;
if (handlerWrapper == null) {
@ -95,7 +88,6 @@ public class RestController implements HttpServerTransport.Dispatcher {
this.handlerWrapper = handlerWrapper;
this.client = client;
this.circuitBreakerService = circuitBreakerService;
this.restrictSystemIndices = restrictSystemIndices;
}
/**
@ -180,13 +172,6 @@ public class RestController implements HttpServerTransport.Dispatcher {
handleFavicon(request.method(), request.uri(), channel);
return;
}
if (restrictSystemIndices) {
threadContext.disallowSystemIndexAccess();
} else {
assert threadContext.isSystemIndexAccessAllowed();
}
try {
tryAllHandlers(request, channel, threadContext);
} catch (Exception e) {

View File

@ -38,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
@ -86,41 +85,28 @@ public class RestCancellableNodeClient extends FilterClient {
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action, Request request, ActionListener<Response> listener) {
final AtomicBoolean created = new AtomicBoolean(false);
CloseListener closeListener = httpChannels.computeIfAbsent(httpChannel, channel -> {
created.set(true);
return new CloseListener();
});
CloseListener closeListener = httpChannels.computeIfAbsent(httpChannel, channel -> new CloseListener());
TaskHolder taskHolder = new TaskHolder();
final Task task;
boolean success = false;
try {
task = client.executeLocally(action, request,
new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
closeListener.unregisterTask(taskHolder);
} finally {
listener.onResponse(response);
}
Task task = client.executeLocally(action, request,
new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
closeListener.unregisterTask(taskHolder);
} finally {
listener.onResponse(response);
}
}
@Override
public void onFailure(Exception e) {
try {
closeListener.unregisterTask(taskHolder);
} finally {
listener.onFailure(e);
}
@Override
public void onFailure(Exception e) {
try {
closeListener.unregisterTask(taskHolder);
} finally {
listener.onFailure(e);
}
});
success = true;
} finally {
if (success == false && created.get()) {
httpChannels.remove(httpChannel);
}
}
}
});
final TaskId taskId = new TaskId(client.getLocalNodeId(), task.getId());
closeListener.registerTask(taskHolder, taskId);
closeListener.maybeRegisterChannel(httpChannel);

View File

@ -24,8 +24,8 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.BaseRestHandler;
@ -36,7 +36,6 @@ import org.elasticsearch.rest.action.RestStatusToXContentListener;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.function.Supplier;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
@ -96,10 +95,10 @@ public class RestIndexAction extends BaseRestHandler {
public static final class AutoIdHandler extends RestIndexAction {
private final Supplier<DiscoveryNodes> nodesInCluster;
private final ClusterService clusterService;
public AutoIdHandler(Supplier<DiscoveryNodes> nodesInCluster) {
this.nodesInCluster = nodesInCluster;
public AutoIdHandler(ClusterService clusterService) {
this.clusterService = clusterService;
}
@Override
@ -117,7 +116,7 @@ public class RestIndexAction extends BaseRestHandler {
@Override
public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient client) throws IOException {
assert request.params().get("id") == null : "non-null id: " + request.params().get("id");
if (request.params().get("op_type") == null && nodesInCluster.get().getMinNodeVersion().onOrAfter(Version.V_7_5_0)) {
if (request.params().get("op_type") == null && clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_5_0)) {
// default to op_type create
request.params().put("op_type", "create");
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
@ -103,14 +102,4 @@ final class CompressibleBytesOutputStream extends StreamOutput {
public void reset() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Version getVersion() {
return stream.getVersion();
}
@Override
public void setVersion(Version version) {
stream.setVersion(version);
}
}

View File

@ -65,8 +65,7 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
long requestId = streamInput.readLong();
byte status = streamInput.readByte();
final Version remoteVersion = Version.fromId(streamInput.readInt());
streamInput.setVersion(remoteVersion);
Version remoteVersion = Version.fromId(streamInput.readInt());
final boolean isHandshake = TransportStatus.isHandshake(status);
ensureVersionCompatibility(remoteVersion, version, isHandshake);
@ -74,11 +73,10 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
// Consume the variable header size
streamInput.readInt();
} else {
streamInput = decompressingStream(status, streamInput);
assertRemoteVersion(streamInput, remoteVersion);
streamInput = decompressingStream(status, remoteVersion, streamInput);
}
threadContext.readFrom(streamInput);
threadContext.readHeaders(streamInput);
InboundMessage message;
if (TransportStatus.isRequest(status)) {
@ -96,19 +94,15 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
final String action = streamInput.readString();
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
streamInput = decompressingStream(status, streamInput);
assertRemoteVersion(streamInput, remoteVersion);
streamInput = decompressingStream(status, remoteVersion, streamInput);
}
streamInput = namedWriteableStream(streamInput);
assertRemoteVersion(streamInput, remoteVersion);
streamInput = namedWriteableStream(streamInput, remoteVersion);
message = new Request(threadContext, remoteVersion, status, requestId, action, features, streamInput);
} else {
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
streamInput = decompressingStream(status, streamInput);
assertRemoteVersion(streamInput, remoteVersion);
streamInput = decompressingStream(status, remoteVersion, streamInput);
}
streamInput = namedWriteableStream(streamInput);
assertRemoteVersion(streamInput, remoteVersion);
streamInput = namedWriteableStream(streamInput, remoteVersion);
message = new Response(threadContext, remoteVersion, status, requestId, streamInput);
}
success = true;
@ -120,10 +114,12 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
}
}
static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
static StreamInput decompressingStream(byte status, Version remoteVersion, StreamInput streamInput) throws IOException {
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
try {
return CompressorFactory.COMPRESSOR.streamInput(streamInput);
StreamInput decompressor = CompressorFactory.COMPRESSOR.streamInput(streamInput);
decompressor.setVersion(remoteVersion);
return decompressor;
} catch (IllegalArgumentException e) {
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
}
@ -132,12 +128,10 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
}
}
private StreamInput namedWriteableStream(StreamInput delegate) {
return new NamedWriteableAwareStreamInput(delegate, namedWriteableRegistry);
}
static void assertRemoteVersion(StreamInput in, Version version) {
assert version.equals(in.getVersion()) : "Stream version [" + in.getVersion() + "] does not match version [" + version + "]";
private StreamInput namedWriteableStream(StreamInput delegate, Version remoteVersion) {
NamedWriteableAwareStreamInput streamInput = new NamedWriteableAwareStreamInput(delegate, namedWriteableRegistry);
streamInput.setVersion(remoteVersion);
return streamInput;
}
}

View File

@ -55,9 +55,9 @@ abstract class OutboundMessage extends NetworkMessage {
}
try (CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
assert stream.getVersion().equals(version) :
"Stream version [" + stream.getVersion() + "] does not match version [" + version + "]";
stream.setVersion(version);
stream.setFeatures(bytesStream.getFeatures());
if (variableHeaderLength == -1) {
writeVariableHeader(stream);
}

View File

@ -28,8 +28,6 @@ import org.elasticsearch.core.internal.io.IOUtils;
import java.io.IOException;
import static org.elasticsearch.transport.InboundMessage.Reader.assertRemoteVersion;
public final class TransportLogger {
private static final Logger logger = LogManager.getLogger(TransportLogger.class);
@ -77,8 +75,7 @@ public final class TransportLogger {
final byte status = streamInput.readByte();
final boolean isRequest = TransportStatus.isRequest(status);
final String type = isRequest ? "request" : "response";
final Version version = Version.fromId(streamInput.readInt());
streamInput.setVersion(version);
Version version = Version.fromId(streamInput.readInt());
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
@ -87,18 +84,11 @@ public final class TransportLogger {
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
sb.append(", header size: ").append(streamInput.readInt()).append('B');
} else {
streamInput = InboundMessage.Reader.decompressingStream(status, streamInput);
assertRemoteVersion(streamInput, version);
streamInput = InboundMessage.Reader.decompressingStream(status, version, streamInput);
}
// TODO (jaymode) Need a better way to deal with this. In one aspect,
// changes were made to ThreadContext to allocate less internally, yet we have this
// ugliness needed to move past the threadcontext data in the stream and discard it
// Could we have an alternative that essentially just seeks through the stream with
// minimal allocation?
// read and discard thread context data
// read and discard headers
ThreadContext.readHeadersFromStream(streamInput);
ThreadContext.readAllowedSystemIndices(streamInput);
if (isRequest) {
if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) {

View File

@ -109,7 +109,7 @@ public class ActionModuleTests extends ESTestCase {
UsageService usageService = new UsageService();
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(),
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), null, emptyList(), null,
null, usageService);
null, usageService, null);
actionModule.initRestHandlers(null);
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
Exception e = expectThrows(IllegalArgumentException.class, () ->
@ -141,7 +141,7 @@ public class ActionModuleTests extends ESTestCase {
UsageService usageService = new UsageService();
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(),
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
singletonList(dupsMainAction), null, null, usageService);
singletonList(dupsMainAction), null, null, usageService, null);
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null));
assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/] for method: GET"));
} finally {
@ -175,7 +175,7 @@ public class ActionModuleTests extends ESTestCase {
UsageService usageService = new UsageService();
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(),
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
singletonList(registersFakeHandler), null, null, usageService);
singletonList(registersFakeHandler), null, null, usageService, null);
actionModule.initRestHandlers(null);
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
Exception e = expectThrows(IllegalArgumentException.class, () ->

View File

@ -21,13 +21,11 @@ package org.elasticsearch.common.compress;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -390,12 +388,9 @@ public class DeflateCompressTests extends ESTestCase {
StreamInput rawIn = new ByteBufferStreamInput(bb);
Compressor c = compressor;
final Version version = VersionUtils.randomVersion(random());
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStreamStreamOutput rawOs = new OutputStreamStreamOutput(bos);
rawOs.setVersion(version);
StreamOutput os = c.streamOutput(rawOs);
assertEquals(version, os.getVersion());
Random r = random();
int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);
@ -415,9 +410,7 @@ public class DeflateCompressTests extends ESTestCase {
byte compressed[] = bos.toByteArray();
ByteBuffer bb2 = ByteBuffer.wrap(compressed);
StreamInput compressedIn = new ByteBufferStreamInput(bb2);
compressedIn.setVersion(version);
StreamInput in = c.streamInput(compressedIn);
assertEquals(version, in.getVersion());
// randomize constants again
bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);

View File

@ -29,13 +29,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class ThreadContextTests extends ESTestCase {
@ -258,7 +254,6 @@ public class ThreadContextTests extends ESTestCase {
threadContext.addResponseHeader("Warning", "123456");
}
threadContext.addResponseHeader("Warning", "234567");
threadContext.disallowSystemIndexAccess();
BytesStreamOutput out = new BytesStreamOutput();
threadContext.writeTo(out);
@ -267,9 +262,8 @@ public class ThreadContextTests extends ESTestCase {
assertNull(threadContext.getTransient("ctx.foo"));
assertTrue(threadContext.getResponseHeaders().isEmpty());
assertEquals("1", threadContext.getHeader("default"));
assertTrue(threadContext.isSystemIndexAccessAllowed());
threadContext.readFrom(out.bytes().streamInput());
threadContext.readHeaders(out.bytes().streamInput());
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getTransient("ctx.foo"));
@ -280,57 +274,10 @@ public class ThreadContextTests extends ESTestCase {
assertThat(warnings, hasSize(2));
assertThat(warnings, hasItem(equalTo("123456")));
assertThat(warnings, hasItem(equalTo("234567")));
assertFalse(threadContext.isSystemIndexAccessAllowed());
}
assertEquals("bar", threadContext.getHeader("foo"));
assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
assertFalse(threadContext.isSystemIndexAccessAllowed());
}
public void testSerializeWithAllowedSystemIndexPatterns() throws IOException {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
threadContext.putHeader("foo", "bar");
threadContext.putTransient("ctx.foo", 1);
threadContext.addResponseHeader("Warning", "123456");
if (rarely()) {
threadContext.addResponseHeader("Warning", "123456");
}
threadContext.addResponseHeader("Warning", "234567");
final List<String> allowed = randomList(1, 8, () -> randomAlphaOfLengthBetween(2, 8));
threadContext.allowSystemIndexAccess(allowed);
BytesStreamOutput out = new BytesStreamOutput();
threadContext.writeTo(out);
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
assertNull(threadContext.getHeader("foo"));
assertNull(threadContext.getTransient("ctx.foo"));
assertTrue(threadContext.getResponseHeaders().isEmpty());
assertEquals("1", threadContext.getHeader("default"));
assertTrue(threadContext.isSystemIndexAccessAllowed());
threadContext.readFrom(out.bytes().streamInput());
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getTransient("ctx.foo"));
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
final List<String> warnings = responseHeaders.get("Warning");
assertThat(responseHeaders.keySet(), hasSize(1));
assertThat(warnings, hasSize(2));
assertThat(warnings, hasItem(equalTo("123456")));
assertThat(warnings, hasItem(equalTo("234567")));
assertTrue(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
}
assertEquals("bar", threadContext.getHeader("foo"));
assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
assertTrue(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
}
public void testSerializeInDifferentContext() throws IOException {
@ -345,19 +292,17 @@ public class ThreadContextTests extends ESTestCase {
threadContext.addResponseHeader("Warning", "123456");
}
threadContext.addResponseHeader("Warning", "234567");
threadContext.disallowSystemIndexAccess();
assertEquals("bar", threadContext.getHeader("foo"));
assertNotNull(threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
assertThat(threadContext.getResponseHeaders().keySet(), hasSize(1));
assertFalse(threadContext.isSystemIndexAccessAllowed());
threadContext.writeTo(out);
}
{
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
ThreadContext otherThreadContext = new ThreadContext(otherSettings);
otherThreadContext.readFrom(out.bytes().streamInput());
otherThreadContext.readHeaders(out.bytes().streamInput());
assertEquals("bar", otherThreadContext.getHeader("foo"));
assertNull(otherThreadContext.getTransient("ctx.foo"));
@ -370,53 +315,6 @@ public class ThreadContextTests extends ESTestCase {
assertThat(warnings, hasSize(2));
assertThat(warnings, hasItem(equalTo("123456")));
assertThat(warnings, hasItem(equalTo("234567")));
assertFalse(otherThreadContext.isSystemIndexAccessAllowed());
}
}
public void testSerializeInDifferentContextWithAllowedSystemIndices() throws IOException {
final List<String> allowed = randomList(1, 8, () -> randomAlphaOfLengthBetween(2, 8));
BytesStreamOutput out = new BytesStreamOutput();
{
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
threadContext.putHeader("foo", "bar");
threadContext.putTransient("ctx.foo", 1);
threadContext.addResponseHeader("Warning", "123456");
if (rarely()) {
threadContext.addResponseHeader("Warning", "123456");
}
threadContext.addResponseHeader("Warning", "234567");
threadContext.allowSystemIndexAccess(allowed);
assertEquals("bar", threadContext.getHeader("foo"));
assertNotNull(threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
assertThat(threadContext.getResponseHeaders().keySet(), hasSize(1));
assertTrue(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
threadContext.writeTo(out);
}
{
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
ThreadContext otherThreadContext = new ThreadContext(otherSettings);
otherThreadContext.readFrom(out.bytes().streamInput());
assertEquals("bar", otherThreadContext.getHeader("foo"));
assertNull(otherThreadContext.getTransient("ctx.foo"));
assertEquals("1", otherThreadContext.getHeader("default"));
final Map<String, List<String>> responseHeaders = otherThreadContext.getResponseHeaders();
final List<String> warnings = responseHeaders.get("Warning");
assertThat(responseHeaders.keySet(), hasSize(1));
assertThat(warnings, hasSize(2));
assertThat(warnings, hasItem(equalTo("123456")));
assertThat(warnings, hasItem(equalTo("234567")));
assertTrue(otherThreadContext.isSystemIndexAccessAllowed());
assertThat(otherThreadContext.allowedSystemIndexPatterns(), equalTo(allowed));
}
}
@ -435,7 +333,7 @@ public class ThreadContextTests extends ESTestCase {
{
Settings otherSettings = Settings.builder().put("request.headers.default", "5").build();
ThreadContext otherhreadContext = new ThreadContext(otherSettings);
otherhreadContext.readFrom(out.bytes().streamInput());
otherhreadContext.readHeaders(out.bytes().streamInput());
assertEquals("bar", otherhreadContext.getHeader("foo"));
assertNull(otherhreadContext.getTransient("ctx.foo"));
@ -715,31 +613,6 @@ public class ThreadContextTests extends ESTestCase {
assertEquals("value for key [foo] already present", e.getMessage());
}
public void testSystemIndexAccessAllowed() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
assertTrue(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), empty());
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
assertTrue(threadContext.isSystemIndexAccessAllowed());
threadContext.disallowSystemIndexAccess();
assertFalse(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), nullValue());
}
assertTrue(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), empty());
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.disallowSystemIndexAccess();
final List<String> allowed = randomList(1, 8, () -> randomAlphaOfLengthBetween(2, 8));
threadContext.allowSystemIndexAccess(allowed);
assertTrue(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), not(sameInstance(allowed)));
assertThat(threadContext.allowedSystemIndexPatterns(), equalTo(allowed));
}
assertTrue(threadContext.isSystemIndexAccessAllowed());
assertThat(threadContext.allowedSystemIndexPatterns(), empty());
}
/**
* Sometimes wraps a Runnable in an AbstractRunnable.
*/

View File

@ -90,7 +90,7 @@ public class RestControllerTests extends ESTestCase {
inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
HttpServerTransport httpServerTransport = new TestHttpServerTransport();
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService, randomBoolean());
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService);
restController.registerHandler(RestRequest.Method.GET, "/",
(request, channel, client) -> channel.sendResponse(
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
@ -105,7 +105,7 @@ public class RestControllerTests extends ESTestCase {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
new RestHeaderDefinition("header.2", true)));
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService, randomBoolean());
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
Map<String, List<String>> restHeaders = new HashMap<>();
restHeaders.put("header.1", Collections.singletonList("true"));
restHeaders.put("header.2", Collections.singletonList("true"));
@ -141,7 +141,7 @@ public class RestControllerTests extends ESTestCase {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
new RestHeaderDefinition("header.2", false)));
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService, randomBoolean());
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
Map<String, List<String>> restHeaders = new HashMap<>();
restHeaders.put("header.1", Collections.singletonList("boo"));
restHeaders.put("header.2", Arrays.asList("foo", "bar"));
@ -155,7 +155,7 @@ public class RestControllerTests extends ESTestCase {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
new RestHeaderDefinition("header.2", false)));
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService, randomBoolean());
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
Map<String, List<String>> restHeaders = new HashMap<>();
restHeaders.put("header.1", Collections.singletonList("boo"));
restHeaders.put("header.2", Arrays.asList("foo", "foo"));
@ -209,7 +209,7 @@ public class RestControllerTests extends ESTestCase {
}
public void testRegisterSecondMethodWithDifferentNamedWildcard() {
final RestController restController = new RestController(null, null, null, circuitBreakerService, usageService, randomBoolean());
final RestController restController = new RestController(null, null, null, circuitBreakerService, usageService);
RestRequest.Method firstMethod = randomFrom(RestRequest.Method.values());
RestRequest.Method secondMethod =
@ -236,7 +236,7 @@ public class RestControllerTests extends ESTestCase {
h -> {
assertSame(handler, h);
return (RestRequest request, RestChannel channel, NodeClient client) -> wrapperCalled.set(true);
}, null, circuitBreakerService, usageService, randomBoolean());
}, null, circuitBreakerService, usageService);
restController.registerHandler(RestRequest.Method.GET, "/wrapped", handler);
RestRequest request = testRestRequest("/wrapped", "{}", XContentType.JSON);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
@ -299,7 +299,7 @@ public class RestControllerTests extends ESTestCase {
String content = randomAlphaOfLength((int) Math.round(BREAKER_LIMIT.getBytes() / inFlightRequestsBreaker.getOverhead()));
RestRequest request = testRestRequest("/", content, null);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.NOT_ACCEPTABLE);
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService, randomBoolean());
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService);
restController.registerHandler(RestRequest.Method.GET, "/",
(r, c, client) -> c.sendResponse(
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
@ -570,25 +570,6 @@ public class RestControllerTests extends ESTestCase {
assertThat(channel.getRestResponse().getHeaders().get("Allow"), hasItem(equalTo(RestRequest.Method.GET.toString())));
}
public void testDispatchRestrictSystemIndices() {
restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService, true);
restController.registerHandler(RestRequest.Method.GET, "/",
(request, channel, client) -> channel.sendResponse(
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> {
throw new IllegalArgumentException("test error");
});
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
assertFalse(channel.getSendResponseCalled());
ThreadContext context = new ThreadContext(Settings.EMPTY);
assertTrue(context.isSystemIndexAccessAllowed());
restController.dispatchRequest(fakeRestRequest, channel, context);
assertTrue(channel.getSendResponseCalled());
assertFalse(context.isSystemIndexAccessAllowed());
}
private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
HttpServerTransport {

View File

@ -89,7 +89,7 @@ public class RestHttpResponseHeadersTests extends ESTestCase {
final Settings settings = Settings.EMPTY;
UsageService usageService = new UsageService();
RestController restController = new RestController(Collections.emptySet(),
null, null, circuitBreakerService, usageService, randomBoolean());
null, null, circuitBreakerService, usageService);
// A basic RestHandler handles requests to the endpoint
RestHandler restHandler = new RestHandler() {

View File

@ -60,7 +60,7 @@ public class RestValidateQueryActionTests extends AbstractSearchTestCase {
private static UsageService usageService = new UsageService();
private static RestController controller = new RestController(emptySet(), null, client,
new NoneCircuitBreakerService(), usageService, false);
new NoneCircuitBreakerService(), usageService);
private static RestValidateQueryAction action = new RestValidateQueryAction();
/**

View File

@ -127,8 +127,7 @@ public class RestIndicesActionTests extends ESTestCase {
}
}
final RestController restController =
new RestController(Collections.emptySet(), null, null, null, new UsageService(), randomBoolean());
final RestController restController = new RestController(Collections.emptySet(), null, null, null, new UsageService());
final RestIndicesAction action = new RestIndicesAction();
restController.registerHandler(action);
final Table table = action.buildTable(new FakeRestRequest(), indicesSettings, indicesHealths, indicesStats, indicesMetaDatas);

View File

@ -55,7 +55,7 @@ public class RestRecoveryActionTests extends ESTestCase {
public void testRestRecoveryAction() {
final Settings settings = Settings.EMPTY;
UsageService usageService = new UsageService();
final RestController restController = new RestController(Collections.emptySet(), null, null, null, usageService, randomBoolean());
final RestController restController = new RestController(Collections.emptySet(), null, null, null, usageService);
final RestCatRecoveryAction action = new RestCatRecoveryAction();
restController.registerHandler(action);
final int totalShards = randomIntBetween(1, 32);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestRequest;
@ -42,7 +43,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class RestIndexActionTests extends RestActionTestCase {
@ -50,9 +53,11 @@ public class RestIndexActionTests extends RestActionTestCase {
@Before
public void setUpAction() {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenAnswer(invocationOnMock -> clusterStateSupplier.get());
controller().registerHandler(new RestIndexAction());
controller().registerHandler(new CreateHandler());
controller().registerHandler(new AutoIdHandler(() -> clusterStateSupplier.get().nodes()));
controller().registerHandler(new AutoIdHandler(clusterService));
}
public void testTypeInPath() {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.EOFException;
import java.io.IOException;
@ -34,11 +33,7 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
public void testStreamWithoutCompression() throws IOException {
BytesStream bStream = new ZeroOutOnCloseStream();
if (randomBoolean()) {
bStream.setVersion(VersionUtils.randomVersion(random()));
}
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, false);
assertEquals(bStream.getVersion(), stream.getVersion());
byte[] expectedBytes = randomBytes(randomInt(30));
stream.write(expectedBytes);
@ -66,11 +61,7 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
public void testStreamWithCompression() throws IOException {
BytesStream bStream = new ZeroOutOnCloseStream();
if (randomBoolean()) {
bStream.setVersion(VersionUtils.randomVersion(random()));
}
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);
assertEquals(bStream.getVersion(), stream.getVersion());
byte[] expectedBytes = randomBytes(randomInt(30));
stream.write(expectedBytes);
@ -97,11 +88,7 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
public void testCompressionWithCallingMaterializeFails() throws IOException {
BytesStream bStream = new ZeroOutOnCloseStream();
if (randomBoolean()) {
bStream.setVersion(VersionUtils.randomVersion(random()));
}
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);
assertEquals(bStream.getVersion(), stream.getVersion());
byte[] expectedBytes = randomBytes(between(1, 30));
stream.write(expectedBytes);

View File

@ -47,7 +47,7 @@ public abstract class RestActionTestCase extends ESTestCase {
controller = new RestController(Collections.emptySet(), null,
nodeClient,
new NoneCircuitBreakerService(),
new UsageService(), randomBoolean());
new UsageService());
}
/**

View File

@ -263,7 +263,7 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
}
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.singletonList(
new SystemIndexDescriptor(ENRICH_INDEX_PATTERN, "Contains data to support enrich ingest processors.")
);

View File

@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
@ -61,7 +60,7 @@ public class Logstash extends Plugin implements SystemIndexPlugin {
}
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.singletonList(
new SystemIndexDescriptor(LOGSTASH_CONCRETE_INDEX_NAME, "Contains data for Logstash Central Management")
);

View File

@ -950,7 +950,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
}
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
new SystemIndexDescriptor(MlMetaIndex.INDEX_NAME, "Contains scheduling and anomaly tracking metadata"),
new SystemIndexDescriptor(AnomalyDetectorsIndexFields.CONFIG_INDEX, "Contains ML configuration data"),

View File

@ -1120,7 +1120,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
}
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
new SystemIndexDescriptor(SECURITY_MAIN_ALIAS, "Contains Security configuration"),
new SystemIndexDescriptor(RestrictedIndicesNames.INTERNAL_SECURITY_MAIN_INDEX_6, "Contains Security configuration"),

View File

@ -725,7 +725,7 @@ public class AuthenticationServiceTests extends ESTestCase {
threadContext2.writeTo(output);
StreamInput input = output.bytes().streamInput();
threadContext2 = new ThreadContext(Settings.EMPTY);
threadContext2.readFrom(input);
threadContext2.readHeaders(input);
header = threadContext2.getHeader(AuthenticationField.AUTHENTICATION_KEY);
}

View File

@ -401,7 +401,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
}
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.singletonList(
new SystemIndexDescriptor(TransformInternalIndexConstants.INDEX_NAME_PATTERN, "Contains Transform configuration data")
);

View File

@ -702,7 +702,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
}
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
new SystemIndexDescriptor(Watch.INDEX, "Contains Watch definitions"),
new SystemIndexDescriptor(TriggeredWatchStoreField.INDEX_NAME, "Used to track current and queued Watch execution")