Merge branch 'master' into pr-10624
This commit is contained in:
commit
930eacd457
|
@ -59,6 +59,9 @@
|
|||
|
||||
* http://searchbox-io.github.com/wp-elasticsearch/[Wp-Elasticsearch]:
|
||||
Elasticsearch WordPress Plugin
|
||||
|
||||
* https://github.com/wallmanderco/elasticsearch-indexer[Elasticsearch Indexer]:
|
||||
Elasticsearch WordPress Plugin
|
||||
|
||||
* https://github.com/OlegKunitsyn/eslogd[eslogd]:
|
||||
Linux daemon that replicates events to a central Elasticsearch server in real-time
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
Basic support for hunspell stemming. Hunspell dictionaries will be
|
||||
picked up from a dedicated hunspell directory on the filesystem
|
||||
(defaults to `<path.conf>/hunspell`). Each dictionary is expected to
|
||||
(`<path.conf>/hunspell`). Each dictionary is expected to
|
||||
have its own directory named after its associated locale (language).
|
||||
This dictionary directory is expected to hold a single `*.aff` and
|
||||
one or more `*.dic` files (all of which will automatically be picked up).
|
||||
|
@ -19,10 +19,6 @@ following directory layout will define the `en_US` dictionary:
|
|||
| | |-- en_US.aff
|
||||
--------------------------------------------------
|
||||
|
||||
The location of the hunspell directory can be configured using the
|
||||
`indices.analysis.hunspell.dictionary.location` settings in
|
||||
_elasticsearch.yml_.
|
||||
|
||||
Each dictionary can be configured with one setting:
|
||||
|
||||
`ignore_case`::
|
||||
|
@ -91,9 +87,9 @@ the stemming is determined by the quality of the dictionary.
|
|||
[float]
|
||||
==== Dictionary loading
|
||||
|
||||
By default, the configured (`indices.analysis.hunspell.dictionary.location`)
|
||||
or default Hunspell directory (`config/hunspell/`) is checked for dictionaries
|
||||
when the node starts up, and any dictionaries are automatically loaded.
|
||||
By default, the default Hunspell directory (`config/hunspell/`) is checked
|
||||
for dictionaries when the node starts up, and any dictionaries are
|
||||
automatically loaded.
|
||||
|
||||
Dictionary loading can be deferred until they are actually used by setting
|
||||
`indices.analysis.hunspell.dictionary.lazy` to `true`in the config file.
|
||||
|
|
|
@ -458,3 +458,8 @@ there is not enough disk space to complete this migration, the upgrade will be
|
|||
cancelled and can only be resumed once enough disk space is made available.
|
||||
|
||||
The `index.store.distributor` setting has also been removed.
|
||||
|
||||
=== Hunspell dictionary configuration
|
||||
|
||||
The parameter `indices.analysis.hunspell.dictionary.location` has been removed,
|
||||
and `<path.conf>/hunspell` is always used.
|
||||
|
|
|
@ -121,6 +121,20 @@ This syntax applies to Elasticsearch 1.0 and later:
|
|||
|
||||
* Repeat this process for all remaining nodes.
|
||||
|
||||
[IMPORTANT]
|
||||
====================================================
|
||||
During a rolling upgrade, primary shards assigned to a node with the higher
|
||||
version will never have their replicas assigned to a node with the lower
|
||||
version, because the newer version may have a different data format which is
|
||||
not understood by the older version.
|
||||
|
||||
If it is not possible to assign the replica shards to another node with the
|
||||
higher version -- e.g. if there is only one node with the higher version in
|
||||
the cluster -- then the replica shards will remain unassigned, i.e. the
|
||||
cluster health will be status `yellow`. As soon as another node with the
|
||||
higher version joins the cluster, the replicas should be assigned and the
|
||||
cluster health will reach status `green`.
|
||||
====================================================
|
||||
|
||||
It may be possible to perform the upgrade by installing the new software while the service is running. This would reduce downtime by ensuring the service was ready to run on the new version as soon as it is stopped on the node being upgraded. This can be done by installing the new version in its own directory and using the symbolic link method outlined above. It is important to test this procedure first to be sure that site-specific configuration data and production indices will not be overwritten during the upgrade process.
|
||||
|
||||
|
|
|
@ -30,8 +30,6 @@ import java.io.IOException;
|
|||
*/
|
||||
public abstract class ActionRequest<T extends ActionRequest> extends TransportRequest {
|
||||
|
||||
private boolean listenerThreaded = false;
|
||||
|
||||
protected ActionRequest() {
|
||||
super();
|
||||
}
|
||||
|
@ -43,25 +41,6 @@ public abstract class ActionRequest<T extends ActionRequest> extends TransportRe
|
|||
//this.listenerThreaded = request.listenerThreaded();
|
||||
}
|
||||
|
||||
/**
|
||||
* Should the response listener be executed on a thread or not.
|
||||
* <p/>
|
||||
* <p>When not executing on a thread, it will either be executed on the calling thread, or
|
||||
* on an expensive, IO based, thread.
|
||||
*/
|
||||
public final boolean listenerThreaded() {
|
||||
return this.listenerThreaded;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets if the response listener be executed on a thread or not.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final T listenerThreaded(boolean listenerThreaded) {
|
||||
this.listenerThreaded = listenerThreaded;
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
public abstract ActionRequestValidationException validate();
|
||||
|
||||
@Override
|
||||
|
|
|
@ -48,12 +48,6 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
|
|||
return this.request;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder setListenerThreaded(boolean listenerThreaded) {
|
||||
request.listenerThreaded(listenerThreaded);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder putHeader(String key, Object value) {
|
||||
request.putHeader(key, value);
|
||||
|
@ -61,7 +55,7 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
|
|||
}
|
||||
|
||||
public ListenableActionFuture<Response> execute() {
|
||||
PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(request.listenerThreaded(), threadPool);
|
||||
PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(threadPool);
|
||||
execute(future);
|
||||
return future;
|
||||
}
|
||||
|
|
|
@ -24,20 +24,15 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
/**
|
||||
* A generic proxy that will execute the given action against a specific node.
|
||||
*/
|
||||
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
|
||||
|
||||
protected final TransportService transportService;
|
||||
|
||||
private final TransportService transportService;
|
||||
private final GenericAction<Request, Response> action;
|
||||
|
||||
private final TransportRequestOptions transportOptions;
|
||||
|
||||
@Inject
|
||||
|
@ -48,36 +43,17 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
|
|||
this.transportOptions = action.transportOptions(settings);
|
||||
}
|
||||
|
||||
public void execute(DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
|
||||
public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
|
||||
ActionRequestValidationException validationException = request.validate();
|
||||
if (validationException != null) {
|
||||
listener.onFailure(validationException);
|
||||
return;
|
||||
}
|
||||
transportService.sendRequest(node, action.name(), request, transportOptions, new BaseTransportResponseHandler<Response>() {
|
||||
transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<Response>(listener) {
|
||||
@Override
|
||||
public Response newInstance() {
|
||||
return action.newResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
if (request.listenerThreaded()) {
|
||||
return ThreadPool.Names.LISTENER;
|
||||
}
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -64,8 +64,6 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
|
|||
boolean probablySingleFieldRequest = concreteIndices.length == 1 && request.types().length == 1 && request.fields().length == 1;
|
||||
for (final String index : concreteIndices) {
|
||||
GetFieldMappingsIndexRequest shardRequest = new GetFieldMappingsIndexRequest(request, index, probablySingleFieldRequest);
|
||||
// no threading needed, all is done on the index replication one
|
||||
shardRequest.listenerThreaded(false);
|
||||
shardAction.execute(shardRequest, new ActionListener<GetFieldMappingsResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetFieldMappingsResponse result) {
|
||||
|
|
|
@ -119,7 +119,6 @@ public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLike
|
|||
.type(request.type())
|
||||
.id(request.id())
|
||||
.routing(request.routing())
|
||||
.listenerThreaded(true)
|
||||
.operationThreaded(true);
|
||||
|
||||
getAction.execute(getRequest, new ActionListener<GetResponse>() {
|
||||
|
@ -197,8 +196,7 @@ public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLike
|
|||
SearchRequest searchRequest = new SearchRequest(request).indices(searchIndices)
|
||||
.types(searchTypes)
|
||||
.searchType(request.searchType())
|
||||
.scroll(request.searchScroll())
|
||||
.listenerThreaded(request.listenerThreaded());
|
||||
.scroll(request.searchScroll());
|
||||
|
||||
SearchSourceBuilder extraSource = searchSource().query(boolBuilder);
|
||||
if (request.searchFrom() != 0) {
|
||||
|
|
|
@ -38,14 +38,6 @@ public class SearchScrollRequestBuilder extends ActionRequestBuilder<SearchScrol
|
|||
super(client, new SearchScrollRequest(scrollId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Should the listener be called on a separate thread if needed.
|
||||
*/
|
||||
public SearchScrollRequestBuilder listenerThreaded(boolean threadedListener) {
|
||||
request.listenerThreaded(threadedListener);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The scroll id to use to continue scrolling.
|
||||
*/
|
||||
|
|
|
@ -20,10 +20,10 @@
|
|||
package org.elasticsearch.action.support;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -33,20 +33,16 @@ import java.util.List;
|
|||
*/
|
||||
public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> {
|
||||
|
||||
final boolean listenerThreaded;
|
||||
private final static ESLogger logger = Loggers.getLogger(AbstractListenableActionFuture.class);
|
||||
|
||||
final ThreadPool threadPool;
|
||||
volatile Object listeners;
|
||||
boolean executedListeners = false;
|
||||
|
||||
protected AbstractListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) {
|
||||
this.listenerThreaded = listenerThreaded;
|
||||
protected AbstractListenableActionFuture(ThreadPool threadPool) {
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public boolean listenerThreaded() {
|
||||
return false; // we control execution of the listener
|
||||
}
|
||||
|
||||
public ThreadPool threadPool() {
|
||||
return threadPool;
|
||||
}
|
||||
|
@ -57,6 +53,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
|
|||
}
|
||||
|
||||
public void internalAddListener(ActionListener<T> listener) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
boolean executeImmediate = false;
|
||||
synchronized (this) {
|
||||
if (executedListeners) {
|
||||
|
@ -101,27 +98,10 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
|
|||
}
|
||||
|
||||
private void executeListener(final ActionListener<T> listener) {
|
||||
if (listenerThreaded) {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
listener.onResponse(actionGet());
|
||||
} catch (ElasticsearchException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
listener.onResponse(actionGet());
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
try {
|
||||
listener.onResponse(actionGet());
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -41,8 +41,6 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
|
|||
|
||||
@Override
|
||||
public final void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
// no need to use threaded listener, since we just send a response
|
||||
request.listenerThreaded(false);
|
||||
execute(request, new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
|
|
|
@ -26,8 +26,8 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
*/
|
||||
public class PlainListenableActionFuture<T> extends AbstractListenableActionFuture<T, T> {
|
||||
|
||||
public PlainListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) {
|
||||
super(listenerThreaded, threadPool);
|
||||
public PlainListenableActionFuture(ThreadPool threadPool) {
|
||||
super(threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* An action listener that wraps another action listener and threading its execution.
|
||||
*/
|
||||
public final class ThreadedActionListener<Response> implements ActionListener<Response> {
|
||||
|
||||
/**
|
||||
* Wrapper that can be used to automatically wrap a listener in a threaded listener if needed.
|
||||
*/
|
||||
public static class Wrapper {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final boolean threadedListener;
|
||||
|
||||
public Wrapper(ESLogger logger, Settings settings, ThreadPool threadPool) {
|
||||
this.logger = logger;
|
||||
this.threadPool = threadPool;
|
||||
// Should the action listener be threaded or not by default. Action listeners are automatically threaded for client
|
||||
// nodes and transport client in order to make sure client side code is not executed on IO threads.
|
||||
this.threadedListener = DiscoveryNode.clientNode(settings) || TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING));
|
||||
}
|
||||
|
||||
public <Response> ActionListener<Response> wrap(ActionListener<Response> listener) {
|
||||
if (threadedListener == false) {
|
||||
return listener;
|
||||
}
|
||||
// if its a future, the callback is very lightweight (flipping a bit) so no need to wrap it
|
||||
if (listener instanceof Future) {
|
||||
return listener;
|
||||
}
|
||||
return new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private final ESLogger logger;
|
||||
private final ThreadPool threadPool;
|
||||
private final String executor;
|
||||
private final ActionListener<Response> listener;
|
||||
|
||||
public ThreadedActionListener(ESLogger logger, ThreadPool threadPool, String executor, ActionListener<Response> listener) {
|
||||
this.logger = logger;
|
||||
this.threadPool = threadPool;
|
||||
this.executor = executor;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(final Response response) {
|
||||
threadPool.executor(executor).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Throwable e) {
|
||||
threadPool.executor(executor).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("failed to execute failure callback on [{}], failure [{}]", t, listener, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -19,12 +19,10 @@
|
|||
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -49,21 +47,11 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
|
|||
|
||||
public final ActionFuture<Response> execute(Request request) {
|
||||
PlainActionFuture<Response> future = newFuture();
|
||||
// since we don't have a listener, and we release a possible lock with the future
|
||||
// there is no need to execute it under a listener thread
|
||||
request.listenerThreaded(false);
|
||||
execute(request, future);
|
||||
return future;
|
||||
}
|
||||
|
||||
public final void execute(Request request, ActionListener<Response> listener) {
|
||||
if (forceThreadedListener()) {
|
||||
request.listenerThreaded(true);
|
||||
}
|
||||
if (request.listenerThreaded()) {
|
||||
listener = new ThreadedActionListener<>(threadPool, listener, logger);
|
||||
}
|
||||
|
||||
ActionRequestValidationException validationException = request.validate();
|
||||
if (validationException != null) {
|
||||
listener.onFailure(validationException);
|
||||
|
@ -83,69 +71,8 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
|
|||
}
|
||||
}
|
||||
|
||||
protected boolean forceThreadedListener() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected abstract void doExecute(Request request, ActionListener<Response> listener);
|
||||
|
||||
static final class ThreadedActionListener<Response> implements ActionListener<Response> {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final ActionListener<Response> listener;
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
ThreadedActionListener(ThreadPool threadPool, ActionListener<Response> listener, ESLogger logger) {
|
||||
this.threadPool = threadPool;
|
||||
this.listener = listener;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(final Response response) {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
listener.onResponse(response);
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Can not run threaded action, execution rejected [{}] running on current thread", listener);
|
||||
/* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait
|
||||
* for the response on the listener side which could be a remote machine so make sure we push it out there.*/
|
||||
try {
|
||||
listener.onResponse(response);
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Throwable e) {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Can not run threaded action, execution rejected for listener [{}] running on current thread", listener);
|
||||
/* we don't care if that takes long since we are shutting down (or queue capacity). But if we not respond somebody could wait
|
||||
* for the response on the listener side which could be a remote machine so make sure we push it out there.*/
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse> implements ActionFilterChain {
|
||||
|
||||
private final TransportAction<Request, Response> action;
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionResponse;
|
|||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -75,14 +76,11 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean forceThreadedListener() {
|
||||
// since the callback is async, we typically can get called from within an event in the cluster service
|
||||
// or something similar, so make sure we are threaded so we won't block it.
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final Request request, final ActionListener<Response> listener) {
|
||||
protected void doExecute(final Request request, ActionListener<Response> listener) {
|
||||
// TODO do we really need to wrap it in a listener? the handlers should be cheap
|
||||
if ((listener instanceof ThreadedActionListener) == false) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
}
|
||||
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
|
||||
}
|
||||
|
||||
|
|
|
@ -186,8 +186,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
class OperationTransportHandler implements TransportRequestHandler<Request> {
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
// no need to have a threaded listener since we just send back a response
|
||||
request.listenerThreaded(false);
|
||||
// if we have a local operation, execute it on a thread since we don't spawn
|
||||
request.operationThreaded(true);
|
||||
execute(request, new ActionListener<Response>() {
|
||||
|
|
|
@ -232,8 +232,6 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
|
|||
|
||||
@Override
|
||||
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
|
||||
// no need to have a threaded listener since we just send back a response
|
||||
request.listenerThreaded(false);
|
||||
// if we have a local operation, execute it on a thread since we don't spawn
|
||||
request.operationThreaded(true);
|
||||
execute(request, new ActionListener<Response>() {
|
||||
|
|
|
@ -216,15 +216,6 @@ public class Bootstrap {
|
|||
// fail if using broken version
|
||||
JVMCheck.check();
|
||||
|
||||
bootstrap.setup(true, settings, environment);
|
||||
|
||||
stage = "Startup";
|
||||
bootstrap.start();
|
||||
|
||||
if (!foreground) {
|
||||
closeSysError();
|
||||
}
|
||||
|
||||
keepAliveLatch = new CountDownLatch(1);
|
||||
// keep this thread alive (non daemon thread) until we shutdown
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
|
@ -234,6 +225,15 @@ public class Bootstrap {
|
|||
}
|
||||
});
|
||||
|
||||
bootstrap.setup(true, settings, environment);
|
||||
|
||||
stage = "Startup";
|
||||
bootstrap.start();
|
||||
|
||||
if (!foreground) {
|
||||
closeSysError();
|
||||
}
|
||||
|
||||
keepAliveThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -19,16 +19,19 @@
|
|||
|
||||
package org.elasticsearch.bootstrap;
|
||||
|
||||
import com.google.common.io.ByteStreams;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.elasticsearch.env.Environment;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.security.Permission;
|
||||
import java.security.PermissionCollection;
|
||||
import java.security.Permissions;
|
||||
import java.security.Policy;
|
||||
import java.security.ProtectionDomain;
|
||||
import java.security.URIParameter;
|
||||
|
||||
/**
|
||||
* Initializes securitymanager with necessary permissions.
|
||||
|
@ -45,70 +48,74 @@ class Security {
|
|||
* Initializes securitymanager for the environment
|
||||
* Can only happen once!
|
||||
*/
|
||||
static void configure(Environment environment) throws IOException {
|
||||
// init lucene random seed. it will use /dev/urandom where available.
|
||||
static void configure(Environment environment) throws Exception {
|
||||
// init lucene random seed. it will use /dev/urandom where available:
|
||||
StringHelper.randomId();
|
||||
InputStream config = Security.class.getResourceAsStream(POLICY_RESOURCE);
|
||||
if (config == null) {
|
||||
throw new NoSuchFileException(POLICY_RESOURCE);
|
||||
}
|
||||
Path newConfig = processTemplate(config, environment);
|
||||
System.setProperty("java.security.policy", newConfig.toString());
|
||||
|
||||
// enable security policy: union of template and environment-based paths.
|
||||
URI template = Security.class.getResource(POLICY_RESOURCE).toURI();
|
||||
Policy.setPolicy(new ESPolicy(template, createPermissions(environment)));
|
||||
|
||||
// enable security manager
|
||||
System.setSecurityManager(new SecurityManager());
|
||||
IOUtils.deleteFilesIgnoringExceptions(newConfig); // TODO: maybe log something if it fails?
|
||||
|
||||
// do some basic tests
|
||||
selfTest();
|
||||
}
|
||||
|
||||
// package-private for testing
|
||||
static Path processTemplate(InputStream template, Environment environment) throws IOException {
|
||||
Path processed = Files.createTempFile(null, null);
|
||||
try (OutputStream output = new BufferedOutputStream(Files.newOutputStream(processed))) {
|
||||
// copy the template as-is.
|
||||
try (InputStream in = new BufferedInputStream(template)) {
|
||||
ByteStreams.copy(in, output);
|
||||
}
|
||||
|
||||
// all policy files are UTF-8:
|
||||
// https://docs.oracle.com/javase/7/docs/technotes/guides/security/PolicyFiles.html
|
||||
try (Writer writer = new OutputStreamWriter(output, StandardCharsets.UTF_8)) {
|
||||
writer.write(System.lineSeparator());
|
||||
writer.write("grant {");
|
||||
writer.write(System.lineSeparator());
|
||||
|
||||
// add permissions for all configured paths.
|
||||
// TODO: improve test infra so we can reduce permissions where read/write
|
||||
// is not really needed...
|
||||
addPath(writer, environment.homeFile(), "read,readlink,write,delete");
|
||||
addPath(writer, environment.configFile(), "read,readlink,write,delete");
|
||||
addPath(writer, environment.logsFile(), "read,readlink,write,delete");
|
||||
addPath(writer, environment.pluginsFile(), "read,readlink,write,delete");
|
||||
for (Path path : environment.dataFiles()) {
|
||||
addPath(writer, path, "read,readlink,write,delete");
|
||||
}
|
||||
for (Path path : environment.dataWithClusterFiles()) {
|
||||
addPath(writer, path, "read,readlink,write,delete");
|
||||
}
|
||||
|
||||
writer.write("};");
|
||||
writer.write(System.lineSeparator());
|
||||
}
|
||||
/** returns dynamic Permissions to configured paths */
|
||||
static Permissions createPermissions(Environment environment) throws IOException {
|
||||
// TODO: improve test infra so we can reduce permissions where read/write
|
||||
// is not really needed...
|
||||
Permissions policy = new Permissions();
|
||||
addPath(policy, environment.homeFile(), "read,readlink,write,delete");
|
||||
addPath(policy, environment.configFile(), "read,readlink,write,delete");
|
||||
addPath(policy, environment.logsFile(), "read,readlink,write,delete");
|
||||
addPath(policy, environment.pluginsFile(), "read,readlink,write,delete");
|
||||
for (Path path : environment.dataFiles()) {
|
||||
addPath(policy, path, "read,readlink,write,delete");
|
||||
}
|
||||
return processed;
|
||||
for (Path path : environment.dataWithClusterFiles()) {
|
||||
addPath(policy, path, "read,readlink,write,delete");
|
||||
}
|
||||
|
||||
return policy;
|
||||
}
|
||||
|
||||
static void addPath(Writer writer, Path path, String permissions) throws IOException {
|
||||
/** Add access to path (and all files underneath it */
|
||||
static void addPath(Permissions policy, Path path, String permissions) throws IOException {
|
||||
// paths may not exist yet
|
||||
Files.createDirectories(path);
|
||||
// add each path twice: once for itself, again for files underneath it
|
||||
writer.write("permission java.io.FilePermission \"" + encode(path) + "\", \"" + permissions + "\";");
|
||||
writer.write(System.lineSeparator());
|
||||
writer.write("permission java.io.FilePermission \"" + encode(path) + "${/}-\", \"" + permissions + "\";");
|
||||
writer.write(System.lineSeparator());
|
||||
policy.add(new FilePermission(path.toString(), permissions));
|
||||
policy.add(new FilePermission(path.toString() + path.getFileSystem().getSeparator() + "-", permissions));
|
||||
}
|
||||
|
||||
// Any backslashes in paths must be escaped, because it is the escape character when parsing.
|
||||
// See "Note Regarding File Path Specifications on Windows Systems".
|
||||
// https://docs.oracle.com/javase/7/docs/technotes/guides/security/PolicyFiles.html
|
||||
static String encode(Path path) {
|
||||
return path.toString().replace("\\", "\\\\");
|
||||
|
||||
/** Simple checks that everything is ok */
|
||||
static void selfTest() {
|
||||
// check we can manipulate temporary files
|
||||
try {
|
||||
Files.delete(Files.createTempFile(null, null));
|
||||
} catch (IOException ignored) {
|
||||
// potentially virus scanner
|
||||
} catch (SecurityException problem) {
|
||||
throw new SecurityException("Security misconfiguration: cannot access java.io.tmpdir", problem);
|
||||
}
|
||||
}
|
||||
|
||||
/** custom policy for union of static and dynamic permissions */
|
||||
static class ESPolicy extends Policy {
|
||||
final Policy template;
|
||||
final PermissionCollection dynamic;
|
||||
|
||||
ESPolicy(URI template, PermissionCollection dynamic) throws Exception {
|
||||
this.template = Policy.getInstance("JavaPolicy", new URIParameter(template));
|
||||
this.dynamic = dynamic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean implies(ProtectionDomain domain, Permission permission) {
|
||||
return template.implies(domain, permission) || dynamic.implies(permission);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.client.node;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -28,6 +30,8 @@ import org.elasticsearch.client.support.AbstractClient;
|
|||
import org.elasticsearch.client.support.Headers;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -38,6 +42,7 @@ import java.util.Map;
|
|||
*/
|
||||
public class NodeClient extends AbstractClient {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final Settings settings;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
|
@ -46,9 +51,11 @@ public class NodeClient extends AbstractClient {
|
|||
private final ImmutableMap<ClientAction, TransportAction> actions;
|
||||
|
||||
private final Headers headers;
|
||||
private final ThreadedActionListener.Wrapper threadedWrapper;
|
||||
|
||||
@Inject
|
||||
public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, Map<GenericAction, TransportAction> actions, Headers headers) {
|
||||
this.logger = Loggers.getLogger(getClass(), settings);
|
||||
this.settings = settings;
|
||||
this.threadPool = threadPool;
|
||||
this.admin = admin;
|
||||
|
@ -60,6 +67,7 @@ public class NodeClient extends AbstractClient {
|
|||
}
|
||||
}
|
||||
this.actions = actionsBuilder.immutableMap();
|
||||
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,16 +92,17 @@ public class NodeClient extends AbstractClient {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
|
||||
headers.applyTo(request);
|
||||
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
|
||||
return transportAction.execute(request);
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request) {
|
||||
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
|
||||
execute(action, request, actionFuture);
|
||||
return actionFuture;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
|
||||
headers.applyTo(request);
|
||||
listener = threadedWrapper.wrap(listener);
|
||||
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
|
||||
transportAction.execute(request, listener);
|
||||
}
|
||||
|
|
|
@ -22,12 +22,17 @@ package org.elasticsearch.client.node;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.admin.cluster.ClusterAction;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.client.ClusterAdminClient;
|
||||
import org.elasticsearch.client.support.AbstractClusterAdminClient;
|
||||
import org.elasticsearch.client.support.Headers;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Map;
|
||||
|
@ -37,14 +42,15 @@ import java.util.Map;
|
|||
*/
|
||||
public class NodeClusterAdminClient extends AbstractClusterAdminClient implements ClusterAdminClient {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final ImmutableMap<ClusterAction, TransportAction> actions;
|
||||
|
||||
private final Headers headers;
|
||||
private final ThreadedActionListener.Wrapper threadedWrapper;
|
||||
|
||||
@Inject
|
||||
public NodeClusterAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
|
||||
public NodeClusterAdminClient(Settings settings, ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
|
||||
this.logger = Loggers.getLogger(getClass(), settings);
|
||||
this.threadPool = threadPool;
|
||||
this.headers = headers;
|
||||
MapBuilder<ClusterAction, TransportAction> actionsBuilder = new MapBuilder<>();
|
||||
|
@ -54,6 +60,7 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
|
|||
}
|
||||
}
|
||||
this.actions = actionsBuilder.immutableMap();
|
||||
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -63,16 +70,17 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) {
|
||||
headers.applyTo(request);
|
||||
TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action);
|
||||
return transportAction.execute(request);
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request) {
|
||||
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
|
||||
execute(action, request, actionFuture);
|
||||
return actionFuture;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
|
||||
headers.applyTo(request);
|
||||
listener = threadedWrapper.wrap(listener);
|
||||
TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action);
|
||||
transportAction.execute(request, listener);
|
||||
}
|
||||
|
|
|
@ -22,12 +22,17 @@ package org.elasticsearch.client.node;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.admin.indices.IndicesAction;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.client.support.AbstractIndicesAdminClient;
|
||||
import org.elasticsearch.client.support.Headers;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Map;
|
||||
|
@ -37,14 +42,15 @@ import java.util.Map;
|
|||
*/
|
||||
public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implements IndicesAdminClient {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final ImmutableMap<IndicesAction, TransportAction> actions;
|
||||
|
||||
private final Headers headers;
|
||||
private final ThreadedActionListener.Wrapper threadedWrapper;
|
||||
|
||||
@Inject
|
||||
public NodeIndicesAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
|
||||
public NodeIndicesAdminClient(Settings settings, ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
|
||||
this.logger = Loggers.getLogger(getClass(), settings);
|
||||
this.threadPool = threadPool;
|
||||
this.headers = headers;
|
||||
MapBuilder<IndicesAction, TransportAction> actionsBuilder = new MapBuilder<>();
|
||||
|
@ -54,6 +60,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
|
|||
}
|
||||
}
|
||||
this.actions = actionsBuilder.immutableMap();
|
||||
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -63,16 +70,17 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) {
|
||||
headers.applyTo(request);
|
||||
TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action);
|
||||
return transportAction.execute(request);
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request) {
|
||||
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
|
||||
execute(action, request, actionFuture);
|
||||
return actionFuture;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
|
||||
headers.applyTo(request);
|
||||
listener = threadedWrapper.wrap(listener);
|
||||
TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action);
|
||||
transportAction.execute(request, listener);
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
|
|||
*/
|
||||
public class TransportClient extends AbstractClient {
|
||||
|
||||
private static final String CLIENT_TYPE = "transport";
|
||||
public static final String CLIENT_TYPE = "transport";
|
||||
|
||||
final Injector injector;
|
||||
|
||||
|
@ -103,14 +103,6 @@ public class TransportClient extends AbstractClient {
|
|||
private final TransportClientNodesService nodesService;
|
||||
private final InternalTransportClient internalClient;
|
||||
|
||||
/**
|
||||
* Constructs a new transport client with settings loaded either from the classpath or the file system (the
|
||||
* <tt>elasticsearch.(yml|json)</tt> files optionally prefixed with <tt>config/</tt>).
|
||||
*/
|
||||
public TransportClient() {
|
||||
this(ImmutableSettings.Builder.EMPTY_SETTINGS, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new transport client with explicit settings and settings loaded either from the classpath or the file
|
||||
* system (the <tt>elasticsearch.(yml|json)</tt> files optionally prefixed with <tt>config/</tt>).
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -38,11 +37,9 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -199,7 +196,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
ImmutableList<DiscoveryNode> nodes = this.nodes;
|
||||
ensureNodesAreAvailable(nodes);
|
||||
int index = getNodeNumber();
|
||||
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, threadPool, logger);
|
||||
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index);
|
||||
DiscoveryNode node = nodes.get((index) % nodes.size());
|
||||
try {
|
||||
callback.doWithNode(node, retryListener);
|
||||
|
@ -213,20 +210,15 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
private final NodeListenerCallback<Response> callback;
|
||||
private final ActionListener<Response> listener;
|
||||
private final ImmutableList<DiscoveryNode> nodes;
|
||||
private final ESLogger logger;
|
||||
private final int index;
|
||||
private ThreadPool threadPool;
|
||||
|
||||
private volatile int i;
|
||||
|
||||
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes,
|
||||
int index, ThreadPool threadPool, ESLogger logger) {
|
||||
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes, int index) {
|
||||
this.callback = callback;
|
||||
this.listener = listener;
|
||||
this.nodes = nodes;
|
||||
this.index = index;
|
||||
this.threadPool = threadPool;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -239,38 +231,21 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
|
||||
int i = ++this.i;
|
||||
if (i >= nodes.size()) {
|
||||
runFailureInListenerThreadPool(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
|
||||
listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
|
||||
} else {
|
||||
try {
|
||||
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
|
||||
} catch(final Throwable t) {
|
||||
// this exception can't come from the TransportService as it doesn't throw exceptions at all
|
||||
runFailureInListenerThreadPool(t);
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
runFailureInListenerThreadPool(e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
// need to ensure to not block the netty I/O thread, in case of retry due to the node sampling
|
||||
private void runFailureInListenerThreadPool(final Throwable t) {
|
||||
threadPool.executor(ThreadPool.Names.LISTENER).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Could not execute failure listener: [{}]", t, t.getMessage());
|
||||
} else {
|
||||
logger.error("Could not execute failure listener: [{}]", t.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
@ -505,7 +480,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public static interface NodeListenerCallback<Response> {
|
||||
public interface NodeListenerCallback<Response> {
|
||||
|
||||
void doWithNode(DiscoveryNode node, ActionListener<Response> listener);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.client.transport.support;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
|
@ -30,6 +31,8 @@ import org.elasticsearch.client.transport.TransportClientNodesService;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -41,21 +44,20 @@ import java.util.Map;
|
|||
*/
|
||||
public class InternalTransportClient extends AbstractClient {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final Settings settings;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final TransportClientNodesService nodesService;
|
||||
|
||||
private final InternalTransportAdminClient adminClient;
|
||||
|
||||
private final ImmutableMap<Action, TransportActionNodeProxy> actions;
|
||||
|
||||
private final Headers headers;
|
||||
private final ThreadedActionListener.Wrapper threadedWrapper;
|
||||
|
||||
@Inject
|
||||
public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
|
||||
Map<String, GenericAction> actions, Headers headers) {
|
||||
this.logger = Loggers.getLogger(getClass(), settings);
|
||||
this.settings = settings;
|
||||
this.threadPool = threadPool;
|
||||
this.nodesService = nodesService;
|
||||
|
@ -68,6 +70,7 @@ public class InternalTransportClient extends AbstractClient {
|
|||
}
|
||||
}
|
||||
this.actions = actionsBuilder.immutableMap();
|
||||
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,6 +105,7 @@ public class InternalTransportClient extends AbstractClient {
|
|||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request, ActionListener<Response> listener) {
|
||||
headers.applyTo(request);
|
||||
listener = threadedWrapper.wrap(listener);
|
||||
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.admin.cluster.ClusterAction;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.client.ClusterAdminClient;
|
||||
import org.elasticsearch.client.support.AbstractClusterAdminClient;
|
||||
import org.elasticsearch.client.support.Headers;
|
||||
|
@ -30,6 +31,8 @@ import org.elasticsearch.client.transport.TransportClientNodesService;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -42,17 +45,17 @@ import java.util.Map;
|
|||
@SuppressWarnings("unchecked")
|
||||
public class InternalTransportClusterAdminClient extends AbstractClusterAdminClient implements ClusterAdminClient {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final TransportClientNodesService nodesService;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final ImmutableMap<ClusterAction, TransportActionNodeProxy> actions;
|
||||
|
||||
private final Headers headers;
|
||||
private final ThreadedActionListener.Wrapper threadedWrapper;
|
||||
|
||||
@Inject
|
||||
public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool, TransportService transportService,
|
||||
Map<String, GenericAction> actions, Headers headers) {
|
||||
this.logger = Loggers.getLogger(getClass(), settings);
|
||||
this.nodesService = nodesService;
|
||||
this.threadPool = threadPool;
|
||||
this.headers = headers;
|
||||
|
@ -63,6 +66,7 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
}
|
||||
this.actions = actionsBuilder.immutableMap();
|
||||
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,8 +84,9 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request, final ActionListener<Response> listener) {
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request, ActionListener<Response> listener) {
|
||||
headers.applyTo(request);
|
||||
listener = threadedWrapper.wrap(listener);
|
||||
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.admin.indices.IndicesAction;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.client.support.AbstractIndicesAdminClient;
|
||||
import org.elasticsearch.client.support.Headers;
|
||||
|
@ -30,6 +31,8 @@ import org.elasticsearch.client.transport.TransportClientNodesService;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -42,17 +45,17 @@ import java.util.Map;
|
|||
@SuppressWarnings("unchecked")
|
||||
public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminClient implements IndicesAdminClient {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final TransportClientNodesService nodesService;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final ImmutableMap<Action, TransportActionNodeProxy> actions;
|
||||
|
||||
private final Headers headers;
|
||||
private final ThreadedActionListener.Wrapper threadedWrapper;
|
||||
|
||||
@Inject
|
||||
public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, TransportService transportService, ThreadPool threadPool,
|
||||
Map<String, GenericAction> actions, Headers headers) {
|
||||
this.logger = Loggers.getLogger(getClass(), settings);
|
||||
this.nodesService = nodesService;
|
||||
this.threadPool = threadPool;
|
||||
this.headers = headers;
|
||||
|
@ -63,6 +66,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
}
|
||||
this.actions = actionsBuilder.immutableMap();
|
||||
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,6 +86,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request, ActionListener<Response> listener) {
|
||||
headers.applyTo(request);
|
||||
listener = threadedWrapper.wrap(listener);
|
||||
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
|
||||
@Override
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.nio.file.Path;
|
|||
import java.util.ArrayList;
|
||||
|
||||
import static org.elasticsearch.common.Strings.cleanPath;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
|
||||
|
||||
/**
|
||||
* The environment of where things exists.
|
||||
|
@ -69,16 +68,12 @@ public class Environment {
|
|||
fileStores = allStores.toArray(new ESFileStore[allStores.size()]);
|
||||
}
|
||||
|
||||
public Environment() {
|
||||
this(EMPTY_SETTINGS);
|
||||
}
|
||||
|
||||
public Environment(Settings settings) {
|
||||
this.settings = settings;
|
||||
if (settings.get("path.home") != null) {
|
||||
homeFile = PathUtils.get(cleanPath(settings.get("path.home")));
|
||||
} else {
|
||||
homeFile = PathUtils.get(System.getProperty("user.dir"));
|
||||
throw new IllegalStateException("path.home is not configured");
|
||||
}
|
||||
|
||||
if (settings.get("path.conf") != null) {
|
||||
|
@ -175,26 +170,13 @@ public class Environment {
|
|||
}
|
||||
|
||||
public URL resolveConfig(String path) throws FailedToResolveConfigException {
|
||||
String origPath = path;
|
||||
// first, try it as a path on the file system
|
||||
Path f1 = PathUtils.get(path);
|
||||
if (Files.exists(f1)) {
|
||||
// first, try it as a path in the config directory
|
||||
Path f = configFile.resolve(path);
|
||||
if (Files.exists(f)) {
|
||||
try {
|
||||
return f1.toUri().toURL();
|
||||
return f.toUri().toURL();
|
||||
} catch (MalformedURLException e) {
|
||||
throw new FailedToResolveConfigException("Failed to resolve path [" + f1 + "]", e);
|
||||
}
|
||||
}
|
||||
if (path.startsWith("/")) {
|
||||
path = path.substring(1);
|
||||
}
|
||||
// next, try it relative to the config location
|
||||
Path f2 = configFile.resolve(path);
|
||||
if (Files.exists(f2)) {
|
||||
try {
|
||||
return f2.toUri().toURL();
|
||||
} catch (MalformedURLException e) {
|
||||
throw new FailedToResolveConfigException("Failed to resolve path [" + f1 + "]", e);
|
||||
throw new FailedToResolveConfigException("Failed to resolve path [" + f + "]", e);
|
||||
}
|
||||
}
|
||||
// try and load it from the classpath directly
|
||||
|
@ -209,6 +191,6 @@ public class Environment {
|
|||
return resource;
|
||||
}
|
||||
}
|
||||
throw new FailedToResolveConfigException("Failed to resolve config path [" + origPath + "], tried file path [" + f1 + "], path file [" + f2 + "], and classpath");
|
||||
throw new FailedToResolveConfigException("Failed to resolve config path [" + path + "], tried config path [" + f + "] and classpath");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,6 +96,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
|
|||
*/
|
||||
public class IndicesService extends AbstractLifecycleComponent<IndicesService> implements Iterable<IndexService> {
|
||||
|
||||
public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout";
|
||||
|
||||
private final InternalIndicesLifecycle indicesLifecycle;
|
||||
|
||||
private final IndicesAnalysisService indicesAnalysisService;
|
||||
|
@ -104,6 +106,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
|
||||
private final PluginsService pluginsService;
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final TimeValue shardsClosedTimeout;
|
||||
|
||||
private volatile Map<String, Tuple<IndexService, Injector>> indices = ImmutableMap.of();
|
||||
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
|
||||
|
@ -119,6 +122,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
this.pluginsService = injector.getInstance(PluginsService.class);
|
||||
this.indicesLifecycle.addListener(oldShardsStats);
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,8 +151,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
});
|
||||
}
|
||||
try {
|
||||
if (latch.await(30, TimeUnit.SECONDS) == false) {
|
||||
logger.warn("Not all shards are closed yet, waited 30sec - stopping service");
|
||||
if (latch.await(shardsClosedTimeout.seconds(), TimeUnit.SECONDS) == false) {
|
||||
logger.warn("Not all shards are closed yet, waited {}sec - stopping service", shardsClosedTimeout.seconds());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
|
|
|
@ -74,7 +74,7 @@ public class HunspellService extends AbstractComponent {
|
|||
|
||||
public final static String HUNSPELL_LAZY_LOAD = "indices.analysis.hunspell.dictionary.lazy";
|
||||
public final static String HUNSPELL_IGNORE_CASE = "indices.analysis.hunspell.dictionary.ignore_case";
|
||||
public final static String HUNSPELL_LOCATION = "indices.analysis.hunspell.dictionary.location";
|
||||
private final static String OLD_HUNSPELL_LOCATION = "indices.analysis.hunspell.dictionary.location";
|
||||
private final LoadingCache<String, Dictionary> dictionaries;
|
||||
private final Map<String, Dictionary> knownDictionaries;
|
||||
|
||||
|
@ -116,9 +116,9 @@ public class HunspellService extends AbstractComponent {
|
|||
}
|
||||
|
||||
private Path resolveHunspellDirectory(Settings settings, Environment env) {
|
||||
String location = settings.get(HUNSPELL_LOCATION, null);
|
||||
String location = settings.get(OLD_HUNSPELL_LOCATION, null);
|
||||
if (location != null) {
|
||||
return PathUtils.get(location);
|
||||
throw new IllegalArgumentException("please, put your hunspell dictionaries under config/hunspell !");
|
||||
}
|
||||
return env.configFile().resolve("hunspell");
|
||||
}
|
||||
|
|
|
@ -50,7 +50,6 @@ public class RestClusterHealthAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
|
||||
clusterHealthRequest.listenerThreaded(false);
|
||||
clusterHealthRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterHealthRequest.masterNodeTimeout()));
|
||||
clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout()));
|
||||
String waitForStatus = request.param("wait_for_status");
|
||||
|
|
|
@ -81,8 +81,6 @@ public class RestNodesInfoAction extends BaseRestHandler {
|
|||
}
|
||||
|
||||
final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(nodeIds);
|
||||
nodesInfoRequest.listenerThreaded(false);
|
||||
|
||||
// shortcut, dont do checks if only all is specified
|
||||
if (metrics.size() == 1 && metrics.contains("_all")) {
|
||||
nodesInfoRequest.all();
|
||||
|
|
|
@ -60,7 +60,6 @@ public class RestNodesStatsAction extends BaseRestHandler {
|
|||
Set<String> metrics = Strings.splitStringByCommaToSet(request.param("metric", "_all"));
|
||||
|
||||
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodesIds);
|
||||
nodesStatsRequest.listenerThreaded(false);
|
||||
|
||||
if (metrics.size() == 1 && metrics.contains("_all")) {
|
||||
nodesStatsRequest.all();
|
||||
|
|
|
@ -45,7 +45,6 @@ public class RestDeleteRepositoryAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
DeleteRepositoryRequest deleteRepositoryRequest = deleteRepositoryRequest(request.param("repository"));
|
||||
deleteRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRepositoryRequest.masterNodeTimeout()));
|
||||
deleteRepositoryRequest.listenerThreaded(false);
|
||||
deleteRepositoryRequest.timeout(request.paramAsTime("timeout", deleteRepositoryRequest.timeout()));
|
||||
deleteRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRepositoryRequest.masterNodeTimeout()));
|
||||
client.admin().cluster().deleteRepository(deleteRepositoryRequest, new AcknowledgedRestListener<DeleteRepositoryResponse>(channel));
|
||||
|
|
|
@ -47,7 +47,6 @@ public class RestPutRepositoryAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
PutRepositoryRequest putRepositoryRequest = putRepositoryRequest(request.param("repository"));
|
||||
putRepositoryRequest.listenerThreaded(false);
|
||||
putRepositoryRequest.source(request.content().toUtf8());
|
||||
putRepositoryRequest.verify(request.paramAsBoolean("verify", true));
|
||||
putRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRepositoryRequest.masterNodeTimeout()));
|
||||
|
|
|
@ -50,7 +50,6 @@ public class RestVerifyRepositoryAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
VerifyRepositoryRequest verifyRepositoryRequest = verifyRepositoryRequest(request.param("repository"));
|
||||
verifyRepositoryRequest.listenerThreaded(false);
|
||||
verifyRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", verifyRepositoryRequest.masterNodeTimeout()));
|
||||
verifyRepositoryRequest.timeout(request.paramAsTime("timeout", verifyRepositoryRequest.timeout()));
|
||||
client.admin().cluster().verifyRepository(verifyRepositoryRequest, new RestToXContentListener<VerifyRepositoryResponse>(channel));
|
||||
|
|
|
@ -54,7 +54,6 @@ public class RestClusterRerouteAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
|
||||
clusterRerouteRequest.listenerThreaded(false);
|
||||
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
|
||||
clusterRerouteRequest.explain(request.paramAsBoolean("explain", clusterRerouteRequest.explain()));
|
||||
clusterRerouteRequest.timeout(request.paramAsTime("timeout", clusterRerouteRequest.timeout()));
|
||||
|
|
|
@ -42,7 +42,6 @@ public class RestClusterGetSettingsAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest()
|
||||
.listenerThreaded(false)
|
||||
.routingTable(false)
|
||||
.nodes(false);
|
||||
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
|
||||
|
|
|
@ -46,7 +46,6 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();
|
||||
clusterUpdateSettingsRequest.listenerThreaded(false);
|
||||
clusterUpdateSettingsRequest.timeout(request.paramAsTime("timeout", clusterUpdateSettingsRequest.timeout()));
|
||||
clusterUpdateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterUpdateSettingsRequest.masterNodeTimeout()));
|
||||
Map<String, Object> source = XContentFactory.xContent(request.content()).createParser(request.content()).mapAndClose();
|
||||
|
|
|
@ -53,7 +53,6 @@ public class RestClusterSearchShardsAction extends BaseRestHandler {
|
|||
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
|
||||
final ClusterSearchShardsRequest clusterSearchShardsRequest = Requests.clusterSearchShardsRequest(indices);
|
||||
clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local()));
|
||||
clusterSearchShardsRequest.listenerThreaded(false);
|
||||
|
||||
clusterSearchShardsRequest.types(Strings.splitStringByCommaToArray(request.param("type")));
|
||||
clusterSearchShardsRequest.routing(request.param("routing"));
|
||||
|
|
|
@ -46,7 +46,6 @@ public class RestCreateSnapshotAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
CreateSnapshotRequest createSnapshotRequest = createSnapshotRequest(request.param("repository"), request.param("snapshot"));
|
||||
createSnapshotRequest.listenerThreaded(false);
|
||||
createSnapshotRequest.source(request.content().toUtf8());
|
||||
createSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createSnapshotRequest.masterNodeTimeout()));
|
||||
createSnapshotRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
|
||||
|
|
|
@ -57,7 +57,6 @@ public class RestClusterStateAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
final ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest();
|
||||
clusterStateRequest.listenerThreaded(false);
|
||||
clusterStateRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterStateRequest.indicesOptions()));
|
||||
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
|
||||
clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
|
||||
|
|
|
@ -43,7 +43,6 @@ public class RestClusterStatsAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
|
||||
clusterStatsRequest.listenerThreaded(false);
|
||||
client.admin().cluster().clusterStats(clusterStatsRequest, new RestToXContentListener<ClusterStatsResponse>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,6 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
||||
indicesAliasesRequest.listenerThreaded(false);
|
||||
indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout()));
|
||||
try (XContentParser parser = XContentFactory.xContent(request.content()).createParser(request.content())) {
|
||||
// {
|
||||
|
|
|
@ -61,7 +61,6 @@ public class RestGetIndicesAliasesAction extends BaseRestHandler {
|
|||
.nodes(false)
|
||||
.indices(indices);
|
||||
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
|
||||
clusterStateRequest.listenerThreaded(false);
|
||||
|
||||
client.admin().cluster().state(clusterStateRequest, new RestBuilderListener<ClusterStateResponse>(channel) {
|
||||
@Override
|
||||
|
|
|
@ -59,7 +59,6 @@ public class RestAnalyzeAction extends BaseRestHandler {
|
|||
|
||||
AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"));
|
||||
analyzeRequest.text(text);
|
||||
analyzeRequest.listenerThreaded(false);
|
||||
analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard()));
|
||||
analyzeRequest.analyzer(request.param("analyzer"));
|
||||
analyzeRequest.field(request.param("field"));
|
||||
|
|
|
@ -56,7 +56,6 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
clearIndicesCacheRequest.listenerThreaded(false);
|
||||
clearIndicesCacheRequest.indicesOptions(IndicesOptions.fromRequest(request, clearIndicesCacheRequest.indicesOptions()));
|
||||
fromRequest(request, clearIndicesCacheRequest);
|
||||
client.admin().indices().clearCache(clearIndicesCacheRequest, new RestBuilderListener<ClearIndicesCacheResponse>(channel) {
|
||||
|
|
|
@ -44,7 +44,6 @@ public class RestCloseIndexAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
closeIndexRequest.listenerThreaded(false);
|
||||
closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout()));
|
||||
closeIndexRequest.timeout(request.paramAsTime("timeout", closeIndexRequest.timeout()));
|
||||
closeIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, closeIndexRequest.indicesOptions()));
|
||||
|
|
|
@ -43,7 +43,6 @@ public class RestCreateIndexAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request.param("index"));
|
||||
createIndexRequest.listenerThreaded(false);
|
||||
if (request.hasContent()) {
|
||||
createIndexRequest.source(request.content());
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ public class RestDeleteIndexAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
deleteIndexRequest.listenerThreaded(false);
|
||||
deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout()));
|
||||
deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout()));
|
||||
deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions()));
|
||||
|
|
|
@ -49,7 +49,6 @@ public class RestIndicesExistsAction extends BaseRestHandler {
|
|||
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
indicesExistsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesExistsRequest.indicesOptions()));
|
||||
indicesExistsRequest.local(request.paramAsBoolean("local", indicesExistsRequest.local()));
|
||||
indicesExistsRequest.listenerThreaded(false);
|
||||
client.admin().indices().exists(indicesExistsRequest, new RestResponseListener<IndicesExistsResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(IndicesExistsResponse response) {
|
||||
|
|
|
@ -48,7 +48,6 @@ public class RestTypesExistsAction extends BaseRestHandler {
|
|||
TypesExistsRequest typesExistsRequest = new TypesExistsRequest(
|
||||
Strings.splitStringByCommaToArray(request.param("index")), Strings.splitStringByCommaToArray(request.param("type"))
|
||||
);
|
||||
typesExistsRequest.listenerThreaded(false);
|
||||
typesExistsRequest.local(request.paramAsBoolean("local", typesExistsRequest.local()));
|
||||
typesExistsRequest.indicesOptions(IndicesOptions.fromRequest(request, typesExistsRequest.indicesOptions()));
|
||||
client.admin().indices().typesExists(typesExistsRequest, new RestResponseListener<TypesExistsResponse>(channel) {
|
||||
|
|
|
@ -53,7 +53,6 @@ public class RestFlushAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
flushRequest.listenerThreaded(false);
|
||||
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
|
||||
flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
|
||||
flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));
|
||||
|
|
|
@ -67,7 +67,6 @@ public class RestPutMappingAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
PutMappingRequest putMappingRequest = putMappingRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
putMappingRequest.listenerThreaded(false);
|
||||
putMappingRequest.type(request.param("type"));
|
||||
putMappingRequest.source(request.content().toUtf8());
|
||||
putMappingRequest.timeout(request.paramAsTime("timeout", putMappingRequest.timeout()));
|
||||
|
|
|
@ -44,7 +44,6 @@ public class RestOpenIndexAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
OpenIndexRequest openIndexRequest = new OpenIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
openIndexRequest.listenerThreaded(false);
|
||||
openIndexRequest.timeout(request.paramAsTime("timeout", openIndexRequest.timeout()));
|
||||
openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout()));
|
||||
openIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, openIndexRequest.indicesOptions()));
|
||||
|
|
|
@ -53,7 +53,6 @@ public class RestOptimizeAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
OptimizeRequest optimizeRequest = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
optimizeRequest.listenerThreaded(false);
|
||||
optimizeRequest.indicesOptions(IndicesOptions.fromRequest(request, optimizeRequest.indicesOptions()));
|
||||
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
|
||||
optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes()));
|
||||
|
|
|
@ -51,7 +51,6 @@ public class RestRecoveryAction extends BaseRestHandler {
|
|||
final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
|
||||
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
|
||||
recoveryRequest.listenerThreaded(false);
|
||||
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
|
||||
|
||||
client.admin().indices().recoveries(recoveryRequest, new RestBuilderListener<RecoveryResponse>(channel) {
|
||||
|
|
|
@ -53,7 +53,6 @@ public class RestRefreshAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
refreshRequest.listenerThreaded(false);
|
||||
refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));
|
||||
client.admin().indices().refresh(refreshRequest, new RestBuilderListener<RefreshResponse>(channel) {
|
||||
@Override
|
||||
|
|
|
@ -49,7 +49,6 @@ public class RestIndicesSegmentsAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
indicesSegmentsRequest.verbose(request.paramAsBoolean("verbose", false));
|
||||
indicesSegmentsRequest.listenerThreaded(false);
|
||||
indicesSegmentsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesSegmentsRequest.indicesOptions()));
|
||||
client.admin().indices().segments(indicesSegmentsRequest, new RestBuilderListener<IndicesSegmentResponse>(channel) {
|
||||
@Override
|
||||
|
|
|
@ -55,7 +55,6 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
updateSettingsRequest.listenerThreaded(false);
|
||||
updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout()));
|
||||
updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout()));
|
||||
updateSettingsRequest.indicesOptions(IndicesOptions.fromRequest(request, updateSettingsRequest.indicesOptions()));
|
||||
|
|
|
@ -53,7 +53,6 @@ public class RestIndicesStatsAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
|
||||
indicesStatsRequest.listenerThreaded(false);
|
||||
indicesStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesStatsRequest.indicesOptions()));
|
||||
indicesStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
indicesStatsRequest.types(Strings.splitStringByCommaToArray(request.param("types")));
|
||||
|
|
|
@ -40,7 +40,6 @@ public class RestDeleteIndexTemplateAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(request.param("name"));
|
||||
deleteIndexTemplateRequest.listenerThreaded(false);
|
||||
deleteIndexTemplateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexTemplateRequest.masterNodeTimeout()));
|
||||
client.admin().indices().deleteTemplate(deleteIndexTemplateRequest, new AcknowledgedRestListener<DeleteIndexTemplateResponse>(channel));
|
||||
}
|
||||
|
|
|
@ -58,8 +58,6 @@ public class RestGetIndexTemplateAction extends BaseRestHandler {
|
|||
getIndexTemplatesRequest.local(request.paramAsBoolean("local", getIndexTemplatesRequest.local()));
|
||||
getIndexTemplatesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexTemplatesRequest.masterNodeTimeout()));
|
||||
|
||||
getIndexTemplatesRequest.listenerThreaded(false);
|
||||
|
||||
final boolean implicitAll = getIndexTemplatesRequest.names().length == 0;
|
||||
|
||||
client.admin().indices().getTemplates(getIndexTemplatesRequest, new RestBuilderListener<GetIndexTemplatesResponse>(channel) {
|
||||
|
|
|
@ -42,7 +42,6 @@ public class RestPutIndexTemplateAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
PutIndexTemplateRequest putRequest = new PutIndexTemplateRequest(request.param("name"));
|
||||
putRequest.listenerThreaded(false);
|
||||
putRequest.template(request.param("template", putRequest.template()));
|
||||
putRequest.order(request.paramAsInt("order", putRequest.order()));
|
||||
putRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRequest.masterNodeTimeout()));
|
||||
|
|
|
@ -57,7 +57,6 @@ public class RestValidateQueryAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
ValidateQueryRequest validateQueryRequest = new ValidateQueryRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
validateQueryRequest.listenerThreaded(false);
|
||||
validateQueryRequest.indicesOptions(IndicesOptions.fromRequest(request, validateQueryRequest.indicesOptions()));
|
||||
if (RestActions.hasBodyContent(request)) {
|
||||
validateQueryRequest.source(RestActions.getRestContent(request));
|
||||
|
|
|
@ -47,7 +47,6 @@ public class RestDeleteWarmerAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(Strings.splitStringByCommaToArray(request.param("name")))
|
||||
.indices(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
deleteWarmerRequest.listenerThreaded(false);
|
||||
deleteWarmerRequest.timeout(request.paramAsTime("timeout", deleteWarmerRequest.timeout()));
|
||||
deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout()));
|
||||
deleteWarmerRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteWarmerRequest.indicesOptions()));
|
||||
|
|
|
@ -59,7 +59,6 @@ public class RestPutWarmerAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
PutWarmerRequest putWarmerRequest = new PutWarmerRequest(request.param("name"));
|
||||
putWarmerRequest.listenerThreaded(false);
|
||||
SearchRequest searchRequest = new SearchRequest(Strings.splitStringByCommaToArray(request.param("index")))
|
||||
.types(Strings.splitStringByCommaToArray(request.param("type")))
|
||||
.queryCache(request.paramAsBoolean("query_cache", null))
|
||||
|
|
|
@ -71,7 +71,6 @@ public class RestBulkAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
BulkRequest bulkRequest = Requests.bulkRequest();
|
||||
bulkRequest.listenerThreaded(false);
|
||||
String defaultIndex = request.param("index");
|
||||
String defaultType = request.param("type");
|
||||
String defaultRouting = request.param("routing");
|
||||
|
|
|
@ -68,7 +68,6 @@ public class RestRecoveryAction extends AbstractCatAction {
|
|||
final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
|
||||
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
|
||||
recoveryRequest.listenerThreaded(false);
|
||||
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
|
||||
|
||||
client.admin().indices().recoveries(recoveryRequest, new RestResponseListener<RecoveryResponse>(channel) {
|
||||
|
|
|
@ -58,7 +58,6 @@ public class RestCountAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
CountRequest countRequest = new CountRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
countRequest.indicesOptions(IndicesOptions.fromRequest(request, countRequest.indicesOptions()));
|
||||
countRequest.listenerThreaded(false);
|
||||
if (RestActions.hasBodyContent(request)) {
|
||||
countRequest.source(RestActions.getRestContent(request));
|
||||
} else {
|
||||
|
|
|
@ -51,7 +51,6 @@ public class RestDeleteAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
|
||||
|
||||
deleteRequest.listenerThreaded(false);
|
||||
deleteRequest.operationThreaded(true);
|
||||
|
||||
deleteRequest.routing(request.param("routing"));
|
||||
|
|
|
@ -48,7 +48,6 @@ public class RestExistsAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
final ExistsRequest existsRequest = new ExistsRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
existsRequest.indicesOptions(IndicesOptions.fromRequest(request, existsRequest.indicesOptions()));
|
||||
existsRequest.listenerThreaded(false);
|
||||
if (RestActions.hasBodyContent(request)) {
|
||||
existsRequest.source(RestActions.getRestContent(request));
|
||||
} else {
|
||||
|
|
|
@ -57,7 +57,6 @@ public class RestFieldStatsAction extends BaseRestHandler {
|
|||
fieldStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, fieldStatsRequest.indicesOptions()));
|
||||
fieldStatsRequest.fields(Strings.splitStringByCommaToArray(request.param("fields")));
|
||||
fieldStatsRequest.level(request.param("level", FieldStatsRequest.DEFAULT_LEVEL));
|
||||
fieldStatsRequest.listenerThreaded(false);
|
||||
|
||||
client.fieldStats(fieldStatsRequest, new RestBuilderListener<FieldStatsResponse>(channel) {
|
||||
@Override
|
||||
|
|
|
@ -50,7 +50,6 @@ public class RestGetAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
|
||||
getRequest.listenerThreaded(false);
|
||||
getRequest.operationThreaded(true);
|
||||
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
|
||||
getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing
|
||||
|
|
|
@ -51,7 +51,6 @@ public class RestGetSourceAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
|
||||
getRequest.listenerThreaded(false);
|
||||
getRequest.operationThreaded(true);
|
||||
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
|
||||
getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing
|
||||
|
|
|
@ -47,7 +47,6 @@ public class RestHeadAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
|
||||
getRequest.listenerThreaded(false);
|
||||
getRequest.operationThreaded(true);
|
||||
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
|
||||
getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing
|
||||
|
|
|
@ -53,7 +53,6 @@ public class RestMultiGetAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
MultiGetRequest multiGetRequest = new MultiGetRequest();
|
||||
multiGetRequest.listenerThreaded(false);
|
||||
multiGetRequest.refresh(request.paramAsBoolean("refresh", multiGetRequest.refresh()));
|
||||
multiGetRequest.preference(request.param("preference"));
|
||||
multiGetRequest.realtime(request.paramAsBoolean("realtime", null));
|
||||
|
|
|
@ -70,7 +70,6 @@ public class RestIndexAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
|
||||
indexRequest.listenerThreaded(false);
|
||||
indexRequest.operationThreaded(true);
|
||||
indexRequest.routing(request.param("routing"));
|
||||
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
|
||||
|
|
|
@ -50,8 +50,6 @@ public class RestMoreLikeThisAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
MoreLikeThisRequest mltRequest = moreLikeThisRequest(request.param("index")).type(request.param("type")).id(request.param("id"));
|
||||
mltRequest.routing(request.param("routing"));
|
||||
|
||||
mltRequest.listenerThreaded(false);
|
||||
//TODO the ParseField class that encapsulates the supported names used for an attribute
|
||||
//needs some work if it is to be used in a REST context like this too
|
||||
// See the MoreLikeThisQueryParser constants that hold the valid syntax
|
||||
|
|
|
@ -94,8 +94,6 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
}
|
||||
|
||||
void executePercolate(final PercolateRequest percolateRequest, final RestChannel restChannel, final Client client) {
|
||||
// we just send a response, no need to fork
|
||||
percolateRequest.listenerThreaded(false);
|
||||
client.percolate(percolateRequest, new RestToXContentListener<PercolateResponse>(restChannel));
|
||||
}
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ public class RestPutIndexedScriptAction extends BaseRestHandler {
|
|||
|
||||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, Client client) {
|
||||
PutIndexedScriptRequest putRequest = new PutIndexedScriptRequest(getScriptLang(request), request.param("id")).listenerThreaded(false);
|
||||
PutIndexedScriptRequest putRequest = new PutIndexedScriptRequest(getScriptLang(request), request.param("id"));
|
||||
putRequest.version(request.paramAsLong("version", putRequest.version()));
|
||||
putRequest.versionType(VersionType.fromString(request.param("version_type"), putRequest.versionType()));
|
||||
putRequest.source(request.content());
|
||||
|
|
|
@ -56,7 +56,6 @@ public class RestMultiSearchAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
|
||||
multiSearchRequest.listenerThreaded(false);
|
||||
|
||||
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
|
||||
String[] types = Strings.splitStringByCommaToArray(request.param("type"));
|
||||
|
|
|
@ -77,7 +77,6 @@ public class RestSearchAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
SearchRequest searchRequest;
|
||||
searchRequest = RestSearchAction.parseSearchRequest(request);
|
||||
searchRequest.listenerThreaded(false);
|
||||
client.search(searchRequest, new RestStatusToXContentListener<SearchResponse>(channel));
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,6 @@ public class RestSearchScrollAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
String scrollId = request.param("scroll_id");
|
||||
SearchScrollRequest searchScrollRequest = new SearchScrollRequest();
|
||||
searchScrollRequest.listenerThreaded(false);
|
||||
searchScrollRequest.scrollId(scrollId);
|
||||
String scroll = request.param("scroll");
|
||||
if (scroll != null) {
|
||||
|
|
|
@ -59,7 +59,6 @@ public class RestSuggestAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
|
||||
SuggestRequest suggestRequest = new SuggestRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
suggestRequest.indicesOptions(IndicesOptions.fromRequest(request, suggestRequest.indicesOptions()));
|
||||
suggestRequest.listenerThreaded(false);
|
||||
if (RestActions.hasBodyContent(request)) {
|
||||
suggestRequest.suggest(RestActions.getRestContent(request));
|
||||
} else {
|
||||
|
|
|
@ -49,7 +49,6 @@ public class RestMultiTermVectorsAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
MultiTermVectorsRequest multiTermVectorsRequest = new MultiTermVectorsRequest();
|
||||
multiTermVectorsRequest.listenerThreaded(false);
|
||||
TermVectorsRequest template = new TermVectorsRequest();
|
||||
template.index(request.param("index"));
|
||||
template.type(request.param("type"));
|
||||
|
|
|
@ -55,7 +55,6 @@ public class RestUpdateAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
UpdateRequest updateRequest = new UpdateRequest(request.param("index"), request.param("type"), request.param("id"));
|
||||
updateRequest.listenerThreaded(false);
|
||||
updateRequest.routing(request.param("routing"));
|
||||
updateRequest.parent(request.param("parent"));
|
||||
updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout()));
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -231,7 +232,7 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
|
|||
logger.trace("river {} is already allocated", routing.riverName().getName());
|
||||
continue;
|
||||
}
|
||||
prepareGetMetaDocument(routing.riverName().name()).execute(new ActionListener<GetResponse>() {
|
||||
prepareGetMetaDocument(routing.riverName().name()).execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse getResponse) {
|
||||
if (!rivers.containsKey(routing.riverName())) {
|
||||
|
@ -255,7 +256,7 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
|
|||
logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
|
||||
final ActionListener<GetResponse> listener = this;
|
||||
try {
|
||||
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() {
|
||||
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.LISTENER, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
prepareGetMetaDocument(routing.riverName().name()).execute(listener);
|
||||
|
@ -268,12 +269,12 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
|
|||
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
|
||||
}
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
private GetRequestBuilder prepareGetMetaDocument(String riverName) {
|
||||
return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary").setListenerThreaded(true);
|
||||
return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,6 +127,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
for (Map.Entry<String, Settings> entry : nodesSettings.entrySet()) {
|
||||
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue());
|
||||
sb.put("node.name", settings.get("name") + "/" + entry.getKey());
|
||||
sb.put("path.home", settings.get("path.home")); // pass through ES home dir
|
||||
sb.put(TRIBE_NAME, entry.getKey());
|
||||
sb.put("config.ignore_system_properties", true);
|
||||
if (sb.get("http.enabled") == null) {
|
||||
|
|
|
@ -31,10 +31,11 @@ grant {
|
|||
permission java.io.FilePermission "${java.io.tmpdir}${/}-", "read,write,delete";
|
||||
|
||||
// paths used for running tests
|
||||
// project base directory
|
||||
permission java.io.FilePermission "${project.basedir}${/}target${/}-", "read";
|
||||
// compiled classes
|
||||
permission java.io.FilePermission "${project.basedir}${/}target${/}classes${/}-", "read";
|
||||
permission java.io.FilePermission "${project.basedir}${/}target${/}test-classes${/}-", "read";
|
||||
// read permission for lib sigar
|
||||
permission java.io.FilePermission "${project.basedir}${/}lib/sigar{/}-", "read";
|
||||
permission java.io.FilePermission "${project.basedir}${/}lib${/}sigar${/}-", "read";
|
||||
// mvn custom ./m2/repository for dependency jars
|
||||
permission java.io.FilePermission "${m2.repository}${/}-", "read";
|
||||
|
||||
|
@ -65,9 +66,6 @@ grant {
|
|||
// needed by BootStrap, etc
|
||||
permission java.lang.RuntimePermission "exitVM.*";
|
||||
|
||||
// needed by RandomizedTest.globalTempDir()
|
||||
permission java.lang.RuntimePermission "shutdownHooks";
|
||||
|
||||
// needed by PluginManager
|
||||
permission java.lang.RuntimePermission "setFactory";
|
||||
|
||||
|
@ -84,10 +82,6 @@ grant {
|
|||
// needed for natives calls
|
||||
permission java.lang.RuntimePermission "loadLibrary.*";
|
||||
|
||||
// needed for testing access rules etc
|
||||
permission java.lang.RuntimePermission "createSecurityManager";
|
||||
permission java.security.SecurityPermission "createPolicy.JavaPolicy";
|
||||
|
||||
// reflection hacks:
|
||||
// needed for Striped64 (what is this doing), also enables unmap hack
|
||||
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc";
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ListenerActionTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void verifyThreadedListeners() throws Throwable {
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Throwable> failure = new AtomicReference<>();
|
||||
final AtomicReference<String> threadName = new AtomicReference<>();
|
||||
Client client = client();
|
||||
|
||||
IndexRequest request = new IndexRequest("test", "type", "1");
|
||||
if (randomBoolean()) {
|
||||
// set the source, without it, we will have a verification failure
|
||||
request.source("field1", "value1");
|
||||
}
|
||||
|
||||
client.index(request, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse indexResponse) {
|
||||
threadName.set(Thread.currentThread().getName());
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
threadName.set(Thread.currentThread().getName());
|
||||
failure.set(e);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
latch.await();
|
||||
|
||||
boolean shouldBeThreaded = DiscoveryNode.clientNode(client.settings()) || TransportClient.CLIENT_TYPE.equals(client.settings().get(Client.CLIENT_TYPE_SETTING));
|
||||
if (shouldBeThreaded) {
|
||||
assertTrue(threadName.get().contains("listener"));
|
||||
} else {
|
||||
assertFalse(threadName.get().contains("listener"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -157,7 +158,10 @@ public class BulkProcessorTests extends ElasticsearchIntegrationTest {
|
|||
//https://github.com/elasticsearch/elasticsearch/issues/5038
|
||||
public void testBulkProcessorConcurrentRequestsNoNodeAvailableException() throws Exception {
|
||||
//we create a transport client with no nodes to make sure it throws NoNodeAvailableException
|
||||
Client transportClient = new TransportClient();
|
||||
Settings settings = ImmutableSettings.builder()
|
||||
.put("path.home", createTempDir().toString())
|
||||
.build();
|
||||
Client transportClient = new TransportClient(settings);
|
||||
|
||||
int bulkActions = randomIntBetween(10, 100);
|
||||
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.elasticsearch.action.support.QuerySourceBuilder;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
|
@ -46,7 +48,10 @@ public class CountRequestBuilderTests extends ElasticsearchTestCase {
|
|||
public static void initClient() {
|
||||
//this client will not be hit by any request, but it needs to be a non null proper client
|
||||
//that is why we create it but we don't add any transport address to it
|
||||
client = new TransportClient();
|
||||
Settings settings = ImmutableSettings.builder()
|
||||
.put("path.home", createTempDir().toString())
|
||||
.build();
|
||||
client = new TransportClient(settings);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.action.search;
|
|||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
|
@ -45,7 +47,10 @@ public class SearchRequestBuilderTests extends ElasticsearchTestCase {
|
|||
public static void initClient() {
|
||||
//this client will not be hit by any request, but it needs to be a non null proper client
|
||||
//that is why we create it but we don't add any transport address to it
|
||||
client = new TransportClient();
|
||||
Settings settings = ImmutableSettings.builder()
|
||||
.put("path.home", createTempDir().toString())
|
||||
.build();
|
||||
client = new TransportClient(settings);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(false, null);
|
||||
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(null);
|
||||
transportAction.execute(new TestRequest(), future);
|
||||
try {
|
||||
assertThat(future.get(), notNullValue());
|
||||
|
@ -174,7 +174,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(false, null);
|
||||
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(null);
|
||||
transportAction.execute(new TestRequest(), future);
|
||||
try {
|
||||
assertThat(future.get(), notNullValue());
|
||||
|
|
|
@ -24,12 +24,9 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FilePermission;
|
||||
import java.nio.file.Path;
|
||||
import java.security.Policy;
|
||||
import java.security.ProtectionDomain;
|
||||
import java.security.URIParameter;
|
||||
import java.security.Permissions;
|
||||
|
||||
public class SecurityTests extends ElasticsearchTestCase {
|
||||
|
||||
|
@ -42,17 +39,15 @@ public class SecurityTests extends ElasticsearchTestCase {
|
|||
settingsBuilder.put("path.home", esHome.toString());
|
||||
Settings settings = settingsBuilder.build();
|
||||
|
||||
Environment environment = new Environment(settings);
|
||||
Path policyFile = Security.processTemplate(new ByteArrayInputStream(new byte[0]), environment);
|
||||
Environment environment = new Environment(settings);
|
||||
Permissions permissions = Security.createPermissions(environment);
|
||||
|
||||
ProtectionDomain domain = getClass().getProtectionDomain();
|
||||
Policy policy = Policy.getInstance("JavaPolicy", new URIParameter(policyFile.toUri()));
|
||||
// the fake es home
|
||||
assertTrue(policy.implies(domain, new FilePermission(esHome.toString(), "read")));
|
||||
assertTrue(permissions.implies(new FilePermission(esHome.toString(), "read")));
|
||||
// its parent
|
||||
assertFalse(policy.implies(domain, new FilePermission(path.toString(), "read")));
|
||||
assertFalse(permissions.implies(new FilePermission(path.toString(), "read")));
|
||||
// some other sibling
|
||||
assertFalse(policy.implies(domain, new FilePermission(path.resolve("other").toString(), "read")));
|
||||
assertFalse(permissions.implies(new FilePermission(path.resolve("other").toString(), "read")));
|
||||
}
|
||||
|
||||
/** test generated permissions for all configured paths */
|
||||
|
@ -67,29 +62,26 @@ public class SecurityTests extends ElasticsearchTestCase {
|
|||
settingsBuilder.put("path.logs", path.resolve("logs").toString());
|
||||
Settings settings = settingsBuilder.build();
|
||||
|
||||
Environment environment = new Environment(settings);
|
||||
Path policyFile = Security.processTemplate(new ByteArrayInputStream(new byte[0]), environment);
|
||||
|
||||
ProtectionDomain domain = getClass().getProtectionDomain();
|
||||
Policy policy = Policy.getInstance("JavaPolicy", new URIParameter(policyFile.toUri()));
|
||||
Environment environment = new Environment(settings);
|
||||
Permissions permissions = Security.createPermissions(environment);
|
||||
|
||||
// check that all directories got permissions:
|
||||
// homefile: this is needed unless we break out rules for "lib" dir.
|
||||
// TODO: make read-only
|
||||
assertTrue(policy.implies(domain, new FilePermission(environment.homeFile().toString(), "read,readlink,write,delete")));
|
||||
assertTrue(permissions.implies(new FilePermission(environment.homeFile().toString(), "read,readlink,write,delete")));
|
||||
// config file
|
||||
// TODO: make read-only
|
||||
assertTrue(policy.implies(domain, new FilePermission(environment.configFile().toString(), "read,readlink,write,delete")));
|
||||
assertTrue(permissions.implies(new FilePermission(environment.configFile().toString(), "read,readlink,write,delete")));
|
||||
// plugins: r/w, TODO: can this be minimized?
|
||||
assertTrue(policy.implies(domain, new FilePermission(environment.pluginsFile().toString(), "read,readlink,write,delete")));
|
||||
assertTrue(permissions.implies(new FilePermission(environment.pluginsFile().toString(), "read,readlink,write,delete")));
|
||||
// data paths: r/w
|
||||
for (Path dataPath : environment.dataFiles()) {
|
||||
assertTrue(policy.implies(domain, new FilePermission(dataPath.toString(), "read,readlink,write,delete")));
|
||||
assertTrue(permissions.implies(new FilePermission(dataPath.toString(), "read,readlink,write,delete")));
|
||||
}
|
||||
for (Path dataPath : environment.dataWithClusterFiles()) {
|
||||
assertTrue(policy.implies(domain, new FilePermission(dataPath.toString(), "read,readlink,write,delete")));
|
||||
assertTrue(permissions.implies(new FilePermission(dataPath.toString(), "read,readlink,write,delete")));
|
||||
}
|
||||
// logs: r/w
|
||||
assertTrue(policy.implies(domain, new FilePermission(environment.logsFile().toString(), "read,readlink,write,delete")));
|
||||
assertTrue(permissions.implies(new FilePermission(environment.logsFile().toString(), "read,readlink,write,delete")));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.client.support.Headers;
|
|||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportMessage;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -84,16 +85,23 @@ public abstract class AbstractClientHeadersTests extends ElasticsearchTestCase {
|
|||
CreateIndexAction.INSTANCE, IndicesStatsAction.INSTANCE, ClearIndicesCacheAction.INSTANCE, FlushAction.INSTANCE
|
||||
};
|
||||
|
||||
protected ThreadPool threadPool;
|
||||
private Client client;
|
||||
|
||||
@Before
|
||||
public void initClient() {
|
||||
client = buildClient(HEADER_SETTINGS, ACTIONS);
|
||||
Settings settings = ImmutableSettings.builder()
|
||||
.put(HEADER_SETTINGS)
|
||||
.put("path.home", createTempDir().toString())
|
||||
.build();
|
||||
threadPool = new ThreadPool("test-" + getTestName());
|
||||
client = buildClient(settings, ACTIONS);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupClient() {
|
||||
public void cleanupClient() throws Exception {
|
||||
client.close();
|
||||
terminate(threadPool);
|
||||
}
|
||||
|
||||
protected abstract Client buildClient(Settings headersSettings, GenericAction[] testedActions);
|
||||
|
|
|
@ -42,18 +42,6 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTests {
|
|||
|
||||
private static final ActionFilters EMPTY_FILTERS = new ActionFilters(ImmutableSet.of());
|
||||
|
||||
private ThreadPool threadPool;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
threadPool = new ThreadPool("test");
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws InterruptedException {
|
||||
terminate(threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Client buildClient(Settings headersSettings, GenericAction[] testedActions) {
|
||||
Settings settings = HEADER_SETTINGS;
|
||||
|
@ -61,8 +49,8 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTests {
|
|||
Headers headers = new Headers(settings);
|
||||
Actions actions = new Actions(settings, threadPool, testedActions);
|
||||
|
||||
NodeClusterAdminClient clusterClient = new NodeClusterAdminClient(threadPool, actions, headers);
|
||||
NodeIndicesAdminClient indicesClient = new NodeIndicesAdminClient(threadPool, actions, headers);
|
||||
NodeClusterAdminClient clusterClient = new NodeClusterAdminClient(settings, threadPool, actions, headers);
|
||||
NodeIndicesAdminClient indicesClient = new NodeIndicesAdminClient(settings, threadPool, actions, headers);
|
||||
NodeAdminClient adminClient = new NodeAdminClient(settings, clusterClient, indicesClient);
|
||||
return new NodeClient(settings, threadPool, adminClient, actions, headers);
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTests {
|
|||
.put("client.transport.sniff", false)
|
||||
.put("node.name", "transport_client_" + this.getTestName())
|
||||
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, InternalTransportService.class.getName())
|
||||
.put(HEADER_SETTINGS)
|
||||
.put(headersSettings)
|
||||
.build());
|
||||
|
||||
client.addTransportAddress(address);
|
||||
|
@ -75,6 +75,7 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTests {
|
|||
.put("client.transport.nodes_sampler_interval", "1s")
|
||||
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, InternalTransportService.class.getName())
|
||||
.put(HEADER_SETTINGS)
|
||||
.put("path.home", createTempDir().toString())
|
||||
.build());
|
||||
try {
|
||||
client.addTransportAddress(address);
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue