Merge branch 'master' into feature/client_aggs_parsing

This commit is contained in:
javanna 2017-04-20 23:03:58 +02:00 committed by Luca Cavanna
commit d9916f20a6
36 changed files with 1293 additions and 1312 deletions

View File

@ -61,19 +61,16 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -85,7 +82,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
@ -114,6 +110,7 @@ public class ClusterService extends AbstractLifecycleComponent {
private TimeValue slowTaskLoggingThreshold; private TimeValue slowTaskLoggingThreshold;
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor; private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
private volatile ClusterServiceTaskBatcher taskBatcher;
/** /**
* Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine * Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine
@ -121,7 +118,6 @@ public class ClusterService extends AbstractLifecycleComponent {
private final Collection<ClusterStateApplier> highPriorityStateAppliers = new CopyOnWriteArrayList<>(); private final Collection<ClusterStateApplier> highPriorityStateAppliers = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateApplier> normalPriorityStateAppliers = new CopyOnWriteArrayList<>(); private final Collection<ClusterStateApplier> normalPriorityStateAppliers = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateApplier> lowPriorityStateAppliers = new CopyOnWriteArrayList<>(); private final Collection<ClusterStateApplier> lowPriorityStateAppliers = new CopyOnWriteArrayList<>();
final Map<ClusterStateTaskExecutor, LinkedHashSet<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
private final Iterable<ClusterStateApplier> clusterStateAppliers = Iterables.concat(highPriorityStateAppliers, private final Iterable<ClusterStateApplier> clusterStateAppliers = Iterables.concat(highPriorityStateAppliers,
normalPriorityStateAppliers, lowPriorityStateAppliers); normalPriorityStateAppliers, lowPriorityStateAppliers);
@ -219,8 +215,9 @@ public class ClusterService extends AbstractLifecycleComponent {
DiscoveryNodes nodes = DiscoveryNodes.builder(state.nodes()).add(localNode).localNodeId(localNode.getId()).build(); DiscoveryNodes nodes = DiscoveryNodes.builder(state.nodes()).add(localNode).localNodeId(localNode.getId()).build();
return ClusterState.builder(state).nodes(nodes).blocks(initialBlocks).build(); return ClusterState.builder(state).nodes(nodes).blocks(initialBlocks).build();
}); });
this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME), this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME,
threadPool.getThreadContext()); daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler());
this.taskBatcher = new ClusterServiceTaskBatcher(logger, threadPoolExecutor);
} }
@Override @Override
@ -244,6 +241,44 @@ public class ClusterService extends AbstractLifecycleComponent {
protected synchronized void doClose() { protected synchronized void doClose() {
} }
class ClusterServiceTaskBatcher extends TaskBatcher {
ClusterServiceTaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
super(logger, threadExecutor);
}
@Override
protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
threadPool.generic().execute(
() -> tasks.forEach(
task -> ((UpdateTask) task).listener.onFailure(task.source,
new ProcessClusterEventTimeoutException(timeout, task.source))));
}
@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new ClusterService.TaskInputs(taskExecutor, updateTasks, tasksSummary));
}
class UpdateTask extends BatchedTask {
final ClusterStateTaskListener listener;
UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
ClusterStateTaskExecutor<?> executor) {
super(priority, source, executor, task);
this.listener = listener;
}
@Override
public String describeTasks(List<? extends BatchedTask> tasks) {
return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(
tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
}
}
}
/** /**
* The local node. * The local node.
*/ */
@ -350,6 +385,7 @@ public class ClusterService extends AbstractLifecycleComponent {
listener.onClose(); listener.onClose();
return; return;
} }
// call the post added notification on the same event thread // call the post added notification on the same event thread
try { try {
threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") { threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
@ -432,38 +468,11 @@ public class ClusterService extends AbstractLifecycleComponent {
if (!lifecycle.started()) { if (!lifecycle.started()) {
return; return;
} }
if (tasks.isEmpty()) {
return;
}
try { try {
@SuppressWarnings("unchecked") List<ClusterServiceTaskBatcher.UpdateTask> safeTasks = tasks.entrySet().stream()
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) executor; .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), logger), executor))
// convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal .collect(Collectors.toList());
final IdentityHashMap<Object, ClusterStateTaskListener> tasksIdentity = new IdentityHashMap<>(tasks); taskBatcher.submitTasks(safeTasks, config.timeout());
final List<UpdateTask> updateTasks = tasksIdentity.entrySet().stream().map(
entry -> new UpdateTask(source, entry.getKey(), config.priority(), taskExecutor, safe(entry.getValue(), logger))
).collect(Collectors.toList());
synchronized (updateTasksPerExecutor) {
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor,
k -> new LinkedHashSet<>(updateTasks.size()));
for (UpdateTask existing : existingTasks) {
if (tasksIdentity.containsKey(existing.task)) {
throw new IllegalStateException("task [" + taskExecutor.describeTasks(Collections.singletonList(existing.task)) +
"] with source [" + source + "] is already queued");
}
}
existingTasks.addAll(updateTasks);
}
final UpdateTask firstTask = updateTasks.get(0);
final TimeValue timeout = config.timeout();
if (timeout != null) {
threadPoolExecutor.execute(firstTask, threadPool.scheduler(), timeout, () -> onTimeout(updateTasks, source, timeout));
} else {
threadPoolExecutor.execute(firstTask);
}
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting // ignore cases where we are shutting down..., there is really nothing interesting
// to be done here... // to be done here...
@ -473,60 +482,17 @@ public class ClusterService extends AbstractLifecycleComponent {
} }
} }
private void onTimeout(List<UpdateTask> updateTasks, String source, TimeValue timeout) {
threadPool.generic().execute(() -> {
final ArrayList<UpdateTask> toRemove = new ArrayList<>();
for (UpdateTask task : updateTasks) {
if (task.processed.getAndSet(true) == false) {
logger.debug("cluster state update task [{}] timed out after [{}]", source, timeout);
toRemove.add(task);
}
}
if (toRemove.isEmpty() == false) {
ClusterStateTaskExecutor<Object> clusterStateTaskExecutor = toRemove.get(0).executor;
synchronized (updateTasksPerExecutor) {
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.get(clusterStateTaskExecutor);
if (existingTasks != null) {
existingTasks.removeAll(toRemove);
if (existingTasks.isEmpty()) {
updateTasksPerExecutor.remove(clusterStateTaskExecutor);
}
}
}
for (UpdateTask task : toRemove) {
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source));
}
}
});
}
/** /**
* Returns the tasks that are pending. * Returns the tasks that are pending.
*/ */
public List<PendingClusterTask> pendingTasks() { public List<PendingClusterTask> pendingTasks() {
PrioritizedEsThreadPoolExecutor.Pending[] pendings = threadPoolExecutor.getPending(); return Arrays.stream(threadPoolExecutor.getPending()).map(pending -> {
List<PendingClusterTask> pendingClusterTasks = new ArrayList<>(pendings.length); assert pending.task instanceof SourcePrioritizedRunnable :
for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) { "thread pool executor should only use SourcePrioritizedRunnable instances but found: " + pending.task.getClass().getName();
final String source; SourcePrioritizedRunnable task = (SourcePrioritizedRunnable) pending.task;
final long timeInQueue; return new PendingClusterTask(pending.insertionOrder, pending.priority, new Text(task.source()),
// we have to capture the task as it will be nulled after execution and we don't want to change while we check things here. task.getAgeInMillis(), pending.executing);
final Object task = pending.task; }).collect(Collectors.toList());
if (task == null) {
continue;
} else if (task instanceof SourcePrioritizedRunnable) {
SourcePrioritizedRunnable runnable = (SourcePrioritizedRunnable) task;
source = runnable.source();
timeInQueue = runnable.getAgeInMillis();
} else {
assert false : "expected SourcePrioritizedRunnable got " + task.getClass();
source = "unknown [" + task.getClass() + "]";
timeInQueue = 0;
}
pendingClusterTasks.add(
new PendingClusterTask(pending.insertionOrder, pending.priority, new Text(source), timeInQueue, pending.executing));
}
return pendingClusterTasks;
} }
/** /**
@ -585,19 +551,6 @@ public class ClusterService extends AbstractLifecycleComponent {
this.discoverySettings = discoverySettings; this.discoverySettings = discoverySettings;
} }
abstract static class SourcePrioritizedRunnable extends PrioritizedRunnable {
protected final String source;
SourcePrioritizedRunnable(Priority priority, String source) {
super(priority);
this.source = source;
}
public String source() {
return source;
}
}
void runTasks(TaskInputs taskInputs) { void runTasks(TaskInputs taskInputs) {
if (!lifecycle.started()) { if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster service not started", taskInputs.summary); logger.debug("processing [{}]: ignoring, cluster service not started", taskInputs.summary);
@ -657,8 +610,8 @@ public class ClusterService extends AbstractLifecycleComponent {
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) { public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState); ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
// extract those that are waiting for results // extract those that are waiting for results
List<UpdateTask> nonFailedTasks = new ArrayList<>(); List<ClusterServiceTaskBatcher.UpdateTask> nonFailedTasks = new ArrayList<>();
for (UpdateTask updateTask : taskInputs.updateTasks) { for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) {
assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask; assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
final ClusterStateTaskExecutor.TaskResult taskResult = final ClusterStateTaskExecutor.TaskResult taskResult =
clusterTasksResult.executionResults.get(updateTask.task); clusterTasksResult.executionResults.get(updateTask.task);
@ -675,7 +628,8 @@ public class ClusterService extends AbstractLifecycleComponent {
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) { private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult; ClusterTasksResult<Object> clusterTasksResult;
try { try {
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); List<Object> inputs = taskInputs.updateTasks.stream()
.map(ClusterServiceTaskBatcher.UpdateTask::getTask).collect(Collectors.toList());
clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
} catch (Exception e) { } catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
@ -693,7 +647,7 @@ public class ClusterService extends AbstractLifecycleComponent {
} }
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary); warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
clusterTasksResult = ClusterTasksResult.builder() clusterTasksResult = ClusterTasksResult.builder()
.failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e) .failures(taskInputs.updateTasks.stream().map(ClusterServiceTaskBatcher.UpdateTask::getTask)::iterator, e)
.build(previousClusterState); .build(previousClusterState);
} }
@ -704,7 +658,7 @@ public class ClusterService extends AbstractLifecycleComponent {
boolean assertsEnabled = false; boolean assertsEnabled = false;
assert (assertsEnabled = true); assert (assertsEnabled = true);
if (assertsEnabled) { if (assertsEnabled) {
for (UpdateTask updateTask : taskInputs.updateTasks) { for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) {
assert clusterTasksResult.executionResults.containsKey(updateTask.task) : assert clusterTasksResult.executionResults.containsKey(updateTask.task) :
"missing task result for " + updateTask; "missing task result for " + updateTask;
} }
@ -870,10 +824,10 @@ public class ClusterService extends AbstractLifecycleComponent {
*/ */
class TaskInputs { class TaskInputs {
public final String summary; public final String summary;
public final ArrayList<UpdateTask> updateTasks; public final List<ClusterServiceTaskBatcher.UpdateTask> updateTasks;
public final ClusterStateTaskExecutor<Object> executor; public final ClusterStateTaskExecutor<Object> executor;
TaskInputs(ClusterStateTaskExecutor<Object> executor, ArrayList<UpdateTask> updateTasks, String summary) { TaskInputs(ClusterStateTaskExecutor<Object> executor, List<ClusterServiceTaskBatcher.UpdateTask> updateTasks, String summary) {
this.summary = summary; this.summary = summary;
this.executor = executor; this.executor = executor;
this.updateTasks = updateTasks; this.updateTasks = updateTasks;
@ -895,11 +849,11 @@ public class ClusterService extends AbstractLifecycleComponent {
public final TaskInputs taskInputs; public final TaskInputs taskInputs;
public final ClusterState previousClusterState; public final ClusterState previousClusterState;
public final ClusterState newClusterState; public final ClusterState newClusterState;
public final List<UpdateTask> nonFailedTasks; public final List<ClusterServiceTaskBatcher.UpdateTask> nonFailedTasks;
public final Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults; public final Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults;
TaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, TaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState,
ClusterState newClusterState, List<UpdateTask> nonFailedTasks, ClusterState newClusterState, List<ClusterServiceTaskBatcher.UpdateTask> nonFailedTasks,
Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults) { Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults) {
this.taskInputs = taskInputs; this.taskInputs = taskInputs;
this.previousClusterState = previousClusterState; this.previousClusterState = previousClusterState;
@ -951,7 +905,7 @@ public class ClusterService extends AbstractLifecycleComponent {
public void notifyFailedTasks() { public void notifyFailedTasks() {
// fail all tasks that have failed // fail all tasks that have failed
for (UpdateTask updateTask : taskInputs.updateTasks) { for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) {
assert executionResults.containsKey(updateTask.task) : "missing " + updateTask; assert executionResults.containsKey(updateTask.task) : "missing " + updateTask;
final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.task); final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.task);
if (taskResult.isSuccess() == false) { if (taskResult.isSuccess() == false) {
@ -1071,65 +1025,6 @@ public class ClusterService extends AbstractLifecycleComponent {
} }
} }
class UpdateTask extends SourcePrioritizedRunnable {
public final Object task;
public final ClusterStateTaskListener listener;
private final ClusterStateTaskExecutor<Object> executor;
public final AtomicBoolean processed = new AtomicBoolean();
UpdateTask(String source, Object task, Priority priority, ClusterStateTaskExecutor<Object> executor,
ClusterStateTaskListener listener) {
super(priority, source);
this.task = task;
this.executor = executor;
this.listener = listener;
}
@Override
public void run() {
// if this task is already processed, the executor shouldn't execute other tasks (that arrived later),
// to give other executors a chance to execute their tasks.
if (processed.get() == false) {
final ArrayList<UpdateTask> toExecute = new ArrayList<>();
final Map<String, ArrayList<Object>> processTasksBySource = new HashMap<>();
synchronized (updateTasksPerExecutor) {
LinkedHashSet<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
if (pending != null) {
for (UpdateTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task.task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}
}
if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = executor.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
runTasks(new TaskInputs(executor, toExecute, tasksSummary));
}
}
}
@Override
public String toString() {
String taskDescription = executor.describeTasks(Collections.singletonList(task));
if (taskDescription.isEmpty()) {
return "[" + source + "]";
} else {
return "[" + source + "[" + taskDescription + "]]";
}
}
}
private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) { private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) { if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state update task [{}] took [{}] above the warn threshold of {}", source, executionTime, logger.warn("cluster state update task [{}] took [{}] above the warn threshold of {}", source, executionTime,

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
/**
* PrioritizedRunnable that also has a source string
*/
public abstract class SourcePrioritizedRunnable extends PrioritizedRunnable {
protected final String source;
public SourcePrioritizedRunnable(Priority priority, String source) {
super(priority);
this.source = source;
}
public String source() {
return source;
}
@Override
public String toString() {
return "[" + source + "]";
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Batching support for {@link PrioritizedEsThreadPoolExecutor}
* Tasks that share the same batching key are batched (see {@link BatchedTask#batchingKey})
*/
public abstract class TaskBatcher {
private final Logger logger;
private final PrioritizedEsThreadPoolExecutor threadExecutor;
// package visible for tests
final Map<Object, LinkedHashSet<BatchedTask>> tasksPerBatchingKey = new HashMap<>();
public TaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
this.logger = logger;
this.threadExecutor = threadExecutor;
}
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
if (tasks.isEmpty()) {
return;
}
final BatchedTask firstTask = tasks.get(0);
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
"tasks submitted in a batch should share the same batching key: " + tasks;
// convert to an identity map to check for dups based on task identity
final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
BatchedTask::getTask,
Function.identity(),
(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
IdentityHashMap::new));
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
k -> new LinkedHashSet<>(tasks.size()));
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
if (duplicateTask != null) {
throw new IllegalStateException("task [" + duplicateTask.describeTasks(
Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
}
}
existingTasks.addAll(tasks);
}
if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
} else {
threadExecutor.execute(firstTask);
}
}
private void onTimeoutInternal(List<? extends BatchedTask> tasks, TimeValue timeout) {
final ArrayList<BatchedTask> toRemove = new ArrayList<>();
for (BatchedTask task : tasks) {
if (task.processed.getAndSet(true) == false) {
logger.debug("task [{}] timed out after [{}]", task.source, timeout);
toRemove.add(task);
}
}
if (toRemove.isEmpty() == false) {
BatchedTask firstTask = toRemove.get(0);
Object batchingKey = firstTask.batchingKey;
assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey) :
"tasks submitted in a batch should share the same batching key: " + tasks;
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.get(batchingKey);
if (existingTasks != null) {
existingTasks.removeAll(toRemove);
if (existingTasks.isEmpty()) {
tasksPerBatchingKey.remove(batchingKey);
}
}
}
onTimeout(toRemove, timeout);
}
}
/**
* Action to be implemented by the specific batching implementation.
* All tasks have the same batching key.
*/
protected abstract void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout);
void runIfNotProcessed(BatchedTask updateTask) {
// if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later,
// to give other tasks with different batching key a chance to execute.
if (updateTask.processed.get() == false) {
final List<BatchedTask> toExecute = new ArrayList<>();
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}
}
if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
run(updateTask.batchingKey, toExecute, tasksSummary);
}
}
}
/**
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
/**
* Represents a runnable task that supports batching.
* Implementors of TaskBatcher can subclass this to add a payload to the task.
*/
protected abstract class BatchedTask extends SourcePrioritizedRunnable {
/**
* whether the task has been processed already
*/
protected final AtomicBoolean processed = new AtomicBoolean();
/**
* the object that is used as batching key
*/
protected final Object batchingKey;
/**
* the task object that is wrapped
*/
protected final Object task;
protected BatchedTask(Priority priority, String source, Object batchingKey, Object task) {
super(priority, source);
this.batchingKey = batchingKey;
this.task = task;
}
@Override
public void run() {
runIfNotProcessed(this);
}
@Override
public String toString() {
String taskDescription = describeTasks(Collections.singletonList(this));
if (taskDescription.isEmpty()) {
return "[" + source + "]";
} else {
return "[" + source + "[" + taskDescription + "]]";
}
}
public abstract String describeTasks(List<? extends BatchedTask> tasks);
public Object getTask() {
return task;
}
}
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -57,8 +58,8 @@ public class EsExecutors {
return PROCESSORS_SETTING.get(settings); return PROCESSORS_SETTING.get(settings);
} }
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder) { public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder); return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
} }
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {

View File

@ -44,11 +44,14 @@ import java.util.concurrent.atomic.AtomicLong;
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private static final TimeValue NO_WAIT_TIME_VALUE = TimeValue.timeValueMillis(0); private static final TimeValue NO_WAIT_TIME_VALUE = TimeValue.timeValueMillis(0);
private AtomicLong insertionOrder = new AtomicLong(); private final AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue(); private final Queue<Runnable> current = ConcurrentCollections.newQueue();
private final ScheduledExecutorService timer;
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder); super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
this.timer = timer;
} }
public Pending[] getPending() { public Pending[] getPending() {
@ -111,7 +114,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
current.remove(r); current.remove(r);
} }
public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) { public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
command = wrapRunnable(command); command = wrapRunnable(command);
doExecute(command); doExecute(command);
if (timeout.nanos() >= 0) { if (timeout.nanos() >= 0) {

View File

@ -18,31 +18,84 @@
*/ */
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import static java.util.Collections.unmodifiableMap;
/** /**
* Represents a set of computed addAggregation. * Represents a set of {@link Aggregation}s
*/ */
public interface Aggregations extends Iterable<Aggregation> { public abstract class Aggregations implements Iterable<Aggregation> {
protected List<? extends Aggregation> aggregations = Collections.emptyList();
protected Map<String, Aggregation> aggregationsAsMap;
protected Aggregations() {
}
protected Aggregations(List<? extends Aggregation> aggregations) {
this.aggregations = aggregations;
}
/**
* Iterates over the {@link Aggregation}s.
*/
@Override
public final Iterator<Aggregation> iterator() {
return aggregations.stream().map((p) -> (Aggregation) p).iterator();
}
/** /**
* The list of {@link Aggregation}s. * The list of {@link Aggregation}s.
*/ */
List<Aggregation> asList(); public final List<Aggregation> asList() {
return Collections.unmodifiableList(aggregations);
}
/** /**
* Returns the {@link Aggregation}s keyed by aggregation name. * Returns the {@link Aggregation}s keyed by aggregation name.
*/ */
Map<String, Aggregation> asMap(); public final Map<String, Aggregation> asMap() {
return getAsMap();
}
/** /**
* Returns the {@link Aggregation}s keyed by aggregation name. * Returns the {@link Aggregation}s keyed by aggregation name.
*/ */
Map<String, Aggregation> getAsMap(); public final Map<String, Aggregation> getAsMap() {
if (aggregationsAsMap == null) {
Map<String, Aggregation> newAggregationsAsMap = new HashMap<>(aggregations.size());
for (Aggregation aggregation : aggregations) {
newAggregationsAsMap.put(aggregation.getName(), aggregation);
}
this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
}
return aggregationsAsMap;
}
/** /**
* Returns the aggregation that is associated with the specified name. * Returns the aggregation that is associated with the specified name.
*/ */
<A extends Aggregation> A get(String name); @SuppressWarnings("unchecked")
public final <A extends Aggregation> A get(String name) {
return (A) asMap().get(name);
}
@Override
public final boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
return aggregations.equals(((Aggregations) obj).aggregations);
}
@Override
public final int hashCode() {
return Objects.hash(getClass(), aggregations);
}
} }

View File

@ -27,27 +27,18 @@ import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
/** /**
* An internal implementation of {@link Aggregations}. * An internal implementation of {@link Aggregations}.
*/ */
public class InternalAggregations implements Aggregations, ToXContent, Streamable { public final class InternalAggregations extends Aggregations implements ToXContent, Streamable {
public static final InternalAggregations EMPTY = new InternalAggregations(); public static final InternalAggregations EMPTY = new InternalAggregations();
private List<InternalAggregation> aggregations = Collections.emptyList();
private Map<String, Aggregation> aggregationsAsMap;
private InternalAggregations() { private InternalAggregations() {
} }
@ -55,55 +46,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
* Constructs a new addAggregation. * Constructs a new addAggregation.
*/ */
public InternalAggregations(List<InternalAggregation> aggregations) { public InternalAggregations(List<InternalAggregation> aggregations) {
this.aggregations = aggregations; super(aggregations);
}
/**
* Iterates over the {@link Aggregation}s.
*/
@Override
public Iterator<Aggregation> iterator() {
return aggregations.stream().map((p) -> (Aggregation) p).iterator();
}
/**
* The list of {@link Aggregation}s.
*/
@Override
public List<Aggregation> asList() {
return aggregations.stream().map((p) -> (Aggregation) p).collect(Collectors.toList());
}
/**
* Returns the {@link Aggregation}s keyed by map.
*/
@Override
public Map<String, Aggregation> asMap() {
return getAsMap();
}
/**
* Returns the {@link Aggregation}s keyed by map.
*/
@Override
public Map<String, Aggregation> getAsMap() {
if (aggregationsAsMap == null) {
Map<String, InternalAggregation> newAggregationsAsMap = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
newAggregationsAsMap.put(aggregation.getName(), aggregation);
}
this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
}
return aggregationsAsMap;
}
/**
* @return the aggregation of the specified name.
*/
@SuppressWarnings("unchecked")
@Override
public <A extends Aggregation> A get(String name) {
return (A) asMap().get(name);
} }
/** /**
@ -118,21 +61,16 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
} }
// first we collect all aggregations of the same type and list them together // first we collect all aggregations of the same type and list them together
Map<String, List<InternalAggregation>> aggByName = new HashMap<>(); Map<String, List<InternalAggregation>> aggByName = new HashMap<>();
for (InternalAggregations aggregations : aggregationsList) { for (InternalAggregations aggregations : aggregationsList) {
for (InternalAggregation aggregation : aggregations.aggregations) { for (Aggregation aggregation : aggregations.aggregations) {
List<InternalAggregation> aggs = aggByName.get(aggregation.getName()); List<InternalAggregation> aggs = aggByName.computeIfAbsent(
if (aggs == null) { aggregation.getName(), k -> new ArrayList<>(aggregationsList.size()));
aggs = new ArrayList<>(aggregationsList.size()); aggs.add((InternalAggregation)aggregation);
aggByName.put(aggregation.getName(), aggs);
}
aggs.add(aggregation);
} }
} }
// now we can use the first aggregation of each list to handle the reduce of its list // now we can use the first aggregation of each list to handle the reduce of its list
List<InternalAggregation> reducedAggregations = new ArrayList<>(); List<InternalAggregation> reducedAggregations = new ArrayList<>();
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) { for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue(); List<InternalAggregation> aggregations = entry.getValue();
@ -142,41 +80,33 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
return new InternalAggregations(reducedAggregations); return new InternalAggregations(reducedAggregations);
} }
/** The fields required to write this addAggregation to xcontent */
static class Fields {
public static final String AGGREGATIONS = "aggregations";
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (aggregations.isEmpty()) { if (aggregations.isEmpty()) {
return builder; return builder;
} }
builder.startObject(Fields.AGGREGATIONS); builder.startObject("aggregations");
toXContentInternal(builder, params); toXContentInternal(builder, params);
return builder.endObject(); return builder.endObject();
} }
/** /**
* Directly write all the addAggregation without their bounding object. Used by sub-addAggregation (non top level addAggregation) * Directly write all the aggregations without their bounding object. Used by sub-aggregations (non top level aggs)
*/ */
public XContentBuilder toXContentInternal(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContentInternal(XContentBuilder builder, Params params) throws IOException {
for (Aggregation aggregation : aggregations) { for (Aggregation aggregation : aggregations) {
((InternalAggregation) aggregation).toXContent(builder, params); ((InternalAggregation)aggregation).toXContent(builder, params);
} }
return builder; return builder;
} }
public static InternalAggregations readAggregations(StreamInput in) throws IOException { public static InternalAggregations readAggregations(StreamInput in) throws IOException {
InternalAggregations result = new InternalAggregations(); InternalAggregations result = new InternalAggregations();
result.readFrom(in); result.readFrom(in);
return result; return result;
} }
public static InternalAggregations readOptionalAggregations(StreamInput in) throws IOException {
return in.readOptionalStreamable(InternalAggregations::new);
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
aggregations = in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)); aggregations = in.readList(stream -> in.readNamedWriteable(InternalAggregation.class));
@ -186,20 +116,8 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
} }
@Override @Override
@SuppressWarnings("unchecked")
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteableList(aggregations); out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
return aggregations.equals(((InternalAggregations) obj).aggregations);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), aggregations);
} }
} }

View File

@ -20,7 +20,6 @@ package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
@ -39,7 +38,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -59,8 +57,6 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -69,7 +65,6 @@ import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
@ -77,17 +72,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
public class ClusterServiceTests extends ESTestCase { public class ClusterServiceTests extends ESTestCase {
@ -151,118 +143,6 @@ public class ClusterServiceTests extends ESTestCase {
return timedClusterService; return timedClusterService;
} }
public void testTimedOutUpdateTaskCleanedUp() throws Exception {
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch blockCompleted = new CountDownLatch(1);
clusterService.submitStateUpdateTask("block-task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
blockCompleted.countDown();
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e);
}
});
final CountDownLatch block2 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
block2.countDown();
return currentState;
}
@Override
public TimeValue timeout() {
return TimeValue.ZERO;
}
@Override
public void onFailure(String source, Exception e) {
block2.countDown();
}
});
block.countDown();
block2.await();
blockCompleted.await();
synchronized (clusterService.updateTasksPerExecutor) {
assertTrue("expected empty map but was " + clusterService.updateTasksPerExecutor,
clusterService.updateTasksPerExecutor.isEmpty());
}
}
public void testTimeoutUpdateTask() throws Exception {
final CountDownLatch block = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e);
}
});
final CountDownLatch timedOut = new CountDownLatch(1);
final AtomicBoolean executeCalled = new AtomicBoolean();
clusterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return TimeValue.timeValueMillis(2);
}
@Override
public void onFailure(String source, Exception e) {
timedOut.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
executeCalled.set(true);
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
timedOut.await();
block.countDown();
final CountDownLatch allProcessed = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e);
}
@Override
public ClusterState execute(ClusterState currentState) {
allProcessed.countDown();
return currentState;
}
});
allProcessed.await(); // executed another task to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));
}
public void testMasterAwareExecution() throws Exception { public void testMasterAwareExecution() throws Exception {
ClusterService nonMaster = createTimedClusterService(false); ClusterService nonMaster = createTimedClusterService(false);
@ -394,164 +274,6 @@ public class ClusterServiceTests extends ESTestCase {
assertThat(assertionRef.get().getMessage(), containsString("not be the cluster state update thread. Reason: [Blocking operation]")); assertThat(assertionRef.get().getMessage(), containsString("not be the cluster state update thread. Reason: [Blocking operation]"));
} }
public void testOneExecutorDontStarveAnother() throws InterruptedException {
final List<String> executionOrder = Collections.synchronizedList(new ArrayList<>());
final Semaphore allowProcessing = new Semaphore(0);
final Semaphore startedProcessing = new Semaphore(0);
class TaskExecutor implements ClusterStateTaskExecutor<String> {
@Override
public ClusterTasksResult<String> execute(ClusterState currentState, List<String> tasks) throws Exception {
executionOrder.addAll(tasks); // do this first, so startedProcessing can be used as a notification that this is done.
startedProcessing.release(tasks.size());
allowProcessing.acquire(tasks.size());
return ClusterTasksResult.<String>builder().successes(tasks).build(ClusterState.builder(currentState).build());
}
}
TaskExecutor executorA = new TaskExecutor();
TaskExecutor executorB = new TaskExecutor();
final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL);
final ClusterStateTaskListener noopListener = (source, e) -> { throw new AssertionError(source, e); };
// this blocks the cluster state queue, so we can set it up right
clusterService.submitStateUpdateTask("0", "A0", config, executorA, noopListener);
// wait to be processed
startedProcessing.acquire(1);
assertThat(executionOrder, equalTo(Arrays.asList("A0")));
// these will be the first batch
clusterService.submitStateUpdateTask("1", "A1", config, executorA, noopListener);
clusterService.submitStateUpdateTask("2", "A2", config, executorA, noopListener);
// release the first 0 task, but not the second
allowProcessing.release(1);
startedProcessing.acquire(2);
assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2")));
// setup the queue with pending tasks for another executor same priority
clusterService.submitStateUpdateTask("3", "B3", config, executorB, noopListener);
clusterService.submitStateUpdateTask("4", "B4", config, executorB, noopListener);
clusterService.submitStateUpdateTask("5", "A5", config, executorA, noopListener);
clusterService.submitStateUpdateTask("6", "A6", config, executorA, noopListener);
// now release the processing
allowProcessing.release(6);
// wait for last task to be processed
startedProcessing.acquire(4);
assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2", "B3", "B4", "A5", "A6")));
}
// test that for a single thread, tasks are executed in the order
// that they are submitted
public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException {
class TaskExecutor implements ClusterStateTaskExecutor<Integer> {
List<Integer> tasks = new ArrayList<>();
@Override
public ClusterTasksResult<Integer> execute(ClusterState currentState, List<Integer> tasks) throws Exception {
this.tasks.addAll(tasks);
return ClusterTasksResult.<Integer>builder().successes(tasks).build(ClusterState.builder(currentState).build());
}
}
int numberOfThreads = randomIntBetween(2, 8);
TaskExecutor[] executors = new TaskExecutor[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
executors[i] = new TaskExecutor();
}
int tasksSubmittedPerThread = randomIntBetween(2, 1024);
CopyOnWriteArrayList<Tuple<String, Throwable>> failures = new CopyOnWriteArrayList<>();
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure: [{}]", source), e);
failures.add(new Tuple<>(source, e));
updateLatch.countDown();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
updateLatch.countDown();
}
};
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
barrier.await();
for (int j = 0; j < tasksSubmittedPerThread; j++) {
clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j,
ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener);
}
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
});
thread.start();
}
// wait for all threads to be ready
barrier.await();
// wait for all threads to finish
barrier.await();
updateLatch.await();
assertThat(failures, empty());
for (int i = 0; i < numberOfThreads; i++) {
assertEquals(tasksSubmittedPerThread, executors[i].tasks.size());
for (int j = 0; j < tasksSubmittedPerThread; j++) {
assertNotNull(executors[i].tasks.get(j));
assertEquals("cluster state update task executed out of order", j, (int) executors[i].tasks.get(j));
}
}
}
public void testSingleBatchSubmission() throws InterruptedException {
Map<Integer, ClusterStateTaskListener> tasks = new HashMap<>();
final int numOfTasks = randomInt(10);
final CountDownLatch latch = new CountDownLatch(numOfTasks);
for (int i = 0; i < numOfTasks; i++) {
while (null != tasks.put(randomInt(1024), new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
fail(ExceptionsHelper.detailedMessage(e));
}
})) ;
}
clusterService.submitStateUpdateTasks("test", tasks, ClusterStateTaskConfig.build(Priority.LANGUID),
(currentState, taskList) -> {
assertThat(taskList.size(), equalTo(tasks.size()));
assertThat(taskList.stream().collect(Collectors.toSet()), equalTo(tasks.keySet()));
return ClusterStateTaskExecutor.ClusterTasksResult.<Integer>builder().successes(taskList).build(currentState);
});
latch.await();
}
public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
class Task { class Task {
@ -745,76 +467,6 @@ public class ClusterServiceTests extends ESTestCase {
} }
} }
/**
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
*/
public void testPrioritizedTasks() throws Exception {
BlockingTask block = new BlockingTask(Priority.IMMEDIATE);
clusterService.submitStateUpdateTask("test", block);
int taskCount = randomIntBetween(5, 20);
// will hold all the tasks in the order in which they were executed
List<PrioritizedTask> tasks = new ArrayList<>(taskCount);
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
Priority priority = randomFrom(Priority.values());
clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks));
}
block.close();
latch.await();
Priority prevPriority = null;
for (PrioritizedTask task : tasks) {
if (prevPriority == null) {
prevPriority = task.priority();
} else {
assertThat(task.priority().sameOrAfter(prevPriority), is(true));
}
}
}
public void testDuplicateSubmission() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(2);
try (BlockingTask blockingTask = new BlockingTask(Priority.IMMEDIATE)) {
clusterService.submitStateUpdateTask("blocking", blockingTask);
ClusterStateTaskExecutor<SimpleTask> executor = (currentState, tasks) ->
ClusterStateTaskExecutor.ClusterTasksResult.<SimpleTask>builder().successes(tasks).build(currentState);
SimpleTask task = new SimpleTask(1);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
fail(ExceptionsHelper.detailedMessage(e));
}
};
clusterService.submitStateUpdateTask("first time", task, ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener);
final IllegalStateException e =
expectThrows(
IllegalStateException.class,
() -> clusterService.submitStateUpdateTask(
"second time",
task,
ClusterStateTaskConfig.build(Priority.NORMAL),
executor, listener));
assertThat(e, hasToString(containsString("task [1] with source [second time] is already queued")));
clusterService.submitStateUpdateTask("third time a charm", new SimpleTask(1),
ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener);
assertThat(latch.getCount(), equalTo(2L));
}
latch.await();
}
@TestLogging("org.elasticsearch.cluster.service:TRACE") // To ensure that we log cluster state events on TRACE level @TestLogging("org.elasticsearch.cluster.service:TRACE") // To ensure that we log cluster state events on TRACE level
public void testClusterStateUpdateLogging() throws Exception { public void testClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender(); MockLogAppender mockAppender = new MockLogAppender();
@ -1250,77 +902,6 @@ public class ClusterServiceTests extends ESTestCase {
assertTrue(applierCalled.get()); assertTrue(applierCalled.get());
} }
private static class SimpleTask {
private final int id;
private SimpleTask(int id) {
this.id = id;
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public String toString() {
return Integer.toString(id);
}
}
private static class BlockingTask extends ClusterStateUpdateTask implements Releasable {
private final CountDownLatch latch = new CountDownLatch(1);
BlockingTask(Priority priority) {
super(priority);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch.await();
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
}
public void close() {
latch.countDown();
}
}
private static class PrioritizedTask extends ClusterStateUpdateTask {
private final CountDownLatch latch;
private final List<PrioritizedTask> tasks;
private PrioritizedTask(Priority priority, CountDownLatch latch, List<PrioritizedTask> tasks) {
super(priority);
this.latch = latch;
this.tasks = tasks;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
tasks.add(this);
latch.countDown();
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
latch.countDown();
}
}
static class TimedClusterService extends ClusterService { static class TimedClusterService extends ClusterService {
public volatile Long currentTimeOverride = null; public volatile Long currentTimeOverride = null;

View File

@ -0,0 +1,350 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
public class TaskBatcherTests extends TaskExecutorTests {
protected TestTaskBatcher taskBatcher;
@Before
public void setUpBatchingTaskExecutor() throws Exception {
taskBatcher = new TestTaskBatcher(logger, threadExecutor);
}
class TestTaskBatcher extends TaskBatcher {
TestTaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
super(logger, threadExecutor);
}
@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
List<UpdateTask> updateTasks = (List) tasks;
((TestExecutor) batchingKey).execute(updateTasks.stream().map(t -> t.task).collect(Collectors.toList()));
updateTasks.forEach(updateTask -> updateTask.listener.processed(updateTask.source));
}
@Override
protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
threadPool.generic().execute(
() -> tasks.forEach(
task -> ((UpdateTask) task).listener.onFailure(task.source,
new ProcessClusterEventTimeoutException(timeout, task.source))));
}
class UpdateTask extends BatchedTask {
final TestListener listener;
UpdateTask(Priority priority, String source, Object task, TestListener listener, TestExecutor<?> executor) {
super(priority, source, executor, task);
this.listener = listener;
}
@Override
public String describeTasks(List<? extends BatchedTask> tasks) {
return ((TestExecutor<Object>) batchingKey).describeTasks(
tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
}
}
}
@Override
protected void submitTask(String source, TestTask testTask) {
submitTask(source, testTask, testTask, testTask, testTask);
}
private <T> void submitTask(String source, T task, ClusterStateTaskConfig config, TestExecutor<T> executor,
TestListener listener) {
submitTasks(source, Collections.singletonMap(task, listener), config, executor);
}
private <T> void submitTasks(final String source,
final Map<T, TestListener> tasks, final ClusterStateTaskConfig config,
final TestExecutor<T> executor) {
List<TestTaskBatcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), e.getValue(), executor))
.collect(Collectors.toList());
taskBatcher.submitTasks(safeTasks, config.timeout());
}
@Override
public void testTimedOutTaskCleanedUp() throws Exception {
super.testTimedOutTaskCleanedUp();
synchronized (taskBatcher.tasksPerBatchingKey) {
assertTrue("expected empty map but was " + taskBatcher.tasksPerBatchingKey,
taskBatcher.tasksPerBatchingKey.isEmpty());
}
}
public void testOneExecutorDoesntStarveAnother() throws InterruptedException {
final List<String> executionOrder = Collections.synchronizedList(new ArrayList<>());
final Semaphore allowProcessing = new Semaphore(0);
final Semaphore startedProcessing = new Semaphore(0);
class TaskExecutor implements TestExecutor<String> {
@Override
public void execute(List<String> tasks) {
executionOrder.addAll(tasks); // do this first, so startedProcessing can be used as a notification that this is done.
startedProcessing.release(tasks.size());
try {
allowProcessing.acquire(tasks.size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
TaskExecutor executorA = new TaskExecutor();
TaskExecutor executorB = new TaskExecutor();
final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL);
final TestListener noopListener = (source, e) -> {
throw new AssertionError(e);
};
// this blocks the cluster state queue, so we can set it up right
submitTask("0", "A0", config, executorA, noopListener);
// wait to be processed
startedProcessing.acquire(1);
assertThat(executionOrder, equalTo(Arrays.asList("A0")));
// these will be the first batch
submitTask("1", "A1", config, executorA, noopListener);
submitTask("2", "A2", config, executorA, noopListener);
// release the first 0 task, but not the second
allowProcessing.release(1);
startedProcessing.acquire(2);
assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2")));
// setup the queue with pending tasks for another executor same priority
submitTask("3", "B3", config, executorB, noopListener);
submitTask("4", "B4", config, executorB, noopListener);
submitTask("5", "A5", config, executorA, noopListener);
submitTask("6", "A6", config, executorA, noopListener);
// now release the processing
allowProcessing.release(6);
// wait for last task to be processed
startedProcessing.acquire(4);
assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2", "B3", "B4", "A5", "A6")));
}
static class TaskExecutor implements TestExecutor<Integer> {
List<Integer> tasks = new ArrayList<>();
@Override
public void execute(List<Integer> tasks) {
this.tasks.addAll(tasks);
}
}
// test that for a single thread, tasks are executed in the order
// that they are submitted
public void testTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException {
int numberOfThreads = randomIntBetween(2, 8);
TaskExecutor[] executors = new TaskExecutor[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
executors[i] = new TaskExecutor();
}
int tasksSubmittedPerThread = randomIntBetween(2, 1024);
CopyOnWriteArrayList<Tuple<String, Throwable>> failures = new CopyOnWriteArrayList<>();
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
final TestListener listener = new TestListener() {
@Override
public void onFailure(String source, Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure: [{}]", source), e);
failures.add(new Tuple<>(source, e));
updateLatch.countDown();
}
@Override
public void processed(String source) {
updateLatch.countDown();
}
};
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
barrier.await();
for (int j = 0; j < tasksSubmittedPerThread; j++) {
submitTask("[" + index + "][" + j + "]", j,
ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener);
}
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
});
thread.start();
}
// wait for all threads to be ready
barrier.await();
// wait for all threads to finish
barrier.await();
updateLatch.await();
assertThat(failures, empty());
for (int i = 0; i < numberOfThreads; i++) {
assertEquals(tasksSubmittedPerThread, executors[i].tasks.size());
for (int j = 0; j < tasksSubmittedPerThread; j++) {
assertNotNull(executors[i].tasks.get(j));
assertEquals("cluster state update task executed out of order", j, (int) executors[i].tasks.get(j));
}
}
}
public void testSingleBatchSubmission() throws InterruptedException {
Map<Integer, TestListener> tasks = new HashMap<>();
final int numOfTasks = randomInt(10);
final CountDownLatch latch = new CountDownLatch(numOfTasks);
for (int i = 0; i < numOfTasks; i++) {
while (null != tasks.put(randomInt(1024), new TestListener() {
@Override
public void processed(String source) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
fail(ExceptionsHelper.detailedMessage(e));
}
})) ;
}
TestExecutor<Integer> executor = taskList -> {
assertThat(taskList.size(), equalTo(tasks.size()));
assertThat(taskList.stream().collect(Collectors.toSet()), equalTo(tasks.keySet()));
};
submitTasks("test", tasks, ClusterStateTaskConfig.build(Priority.LANGUID), executor);
latch.await();
}
public void testDuplicateSubmission() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(2);
try (BlockingTask blockingTask = new BlockingTask(Priority.IMMEDIATE)) {
submitTask("blocking", blockingTask);
TestExecutor<SimpleTask> executor = tasks -> {};
SimpleTask task = new SimpleTask(1);
TestListener listener = new TestListener() {
@Override
public void processed(String source) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
fail(ExceptionsHelper.detailedMessage(e));
}
};
submitTask("first time", task, ClusterStateTaskConfig.build(Priority.NORMAL), executor,
listener);
final IllegalStateException e =
expectThrows(
IllegalStateException.class,
() -> submitTask(
"second time",
task,
ClusterStateTaskConfig.build(Priority.NORMAL),
executor, listener));
assertThat(e, hasToString(containsString("task [1] with source [second time] is already queued")));
submitTask("third time a charm", new SimpleTask(1),
ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener);
assertThat(latch.getCount(), equalTo(2L));
}
latch.await();
}
private static class SimpleTask {
private final int id;
private SimpleTask(int id) {
this.id = id;
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public String toString() {
return Integer.toString(id);
}
}
}

View File

@ -0,0 +1,365 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
public class TaskExecutorTests extends ESTestCase {
protected static ThreadPool threadPool;
protected PrioritizedEsThreadPoolExecutor threadExecutor;
@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(getTestClass().getName());
}
@AfterClass
public static void stopThreadPool() {
if (threadPool != null) {
threadPool.shutdownNow();
threadPool = null;
}
}
@Before
public void setUpExecutor() {
threadExecutor = EsExecutors.newSinglePrioritizing("test_thread",
daemonThreadFactory(Settings.EMPTY, "test_thread"), threadPool.getThreadContext(), threadPool.scheduler());
}
@After
public void shutDownThreadExecutor() {
ThreadPool.terminate(threadExecutor, 10, TimeUnit.SECONDS);
}
protected interface TestListener {
void onFailure(String source, Exception e);
default void processed(String source) {
// do nothing by default
}
}
protected interface TestExecutor<T> {
void execute(List<T> tasks);
default String describeTasks(List<T> tasks) {
return tasks.stream().map(T::toString).reduce((s1,s2) -> {
if (s1.isEmpty()) {
return s2;
} else if (s2.isEmpty()) {
return s1;
} else {
return s1 + ", " + s2;
}
}).orElse("");
}
}
/**
* Task class that works for single tasks as well as batching (see {@link TaskBatcherTests})
*/
protected abstract static class TestTask implements TestExecutor<TestTask>, TestListener, ClusterStateTaskConfig {
@Override
public void execute(List<TestTask> tasks) {
tasks.forEach(TestTask::run);
}
@Nullable
@Override
public TimeValue timeout() {
return null;
}
@Override
public Priority priority() {
return Priority.NORMAL;
}
public abstract void run();
}
class UpdateTask extends SourcePrioritizedRunnable {
final TestTask testTask;
UpdateTask(String source, TestTask testTask) {
super(testTask.priority(), source);
this.testTask = testTask;
}
@Override
public void run() {
logger.trace("will process {}", source);
testTask.execute(Collections.singletonList(testTask));
testTask.processed(source);
}
}
// can be overridden by TaskBatcherTests
protected void submitTask(String source, TestTask testTask) {
SourcePrioritizedRunnable task = new UpdateTask(source, testTask);
TimeValue timeout = testTask.timeout();
if (timeout != null) {
threadExecutor.execute(task, timeout, () -> threadPool.generic().execute(() -> {
logger.debug("task [{}] timed out after [{}]", task, timeout);
testTask.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source));
}));
} else {
threadExecutor.execute(task);
}
}
public void testTimedOutTaskCleanedUp() throws Exception {
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch blockCompleted = new CountDownLatch(1);
TestTask blockTask = new TestTask() {
@Override
public void run() {
try {
block.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
blockCompleted.countDown();
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e);
}
};
submitTask("block-task", blockTask);
final CountDownLatch block2 = new CountDownLatch(1);
TestTask unblockTask = new TestTask() {
@Override
public void run() {
block2.countDown();
}
@Override
public void onFailure(String source, Exception e) {
block2.countDown();
}
@Override
public TimeValue timeout() {
return TimeValue.ZERO;
}
};
submitTask("unblock-task", unblockTask);
block.countDown();
block2.await();
blockCompleted.await();
}
public void testTimeoutTask() throws Exception {
final CountDownLatch block = new CountDownLatch(1);
TestTask test1 = new TestTask() {
@Override
public void run() {
try {
block.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e);
}
};
submitTask("block-task", test1);
final CountDownLatch timedOut = new CountDownLatch(1);
final AtomicBoolean executeCalled = new AtomicBoolean();
TestTask test2 = new TestTask() {
@Override
public TimeValue timeout() {
return TimeValue.timeValueMillis(2);
}
@Override
public void run() {
executeCalled.set(true);
}
@Override
public void onFailure(String source, Exception e) {
timedOut.countDown();
}
};
submitTask("block-task", test2);
timedOut.await();
block.countDown();
final CountDownLatch allProcessed = new CountDownLatch(1);
TestTask test3 = new TestTask() {
@Override
public void run() {
allProcessed.countDown();
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e);
}
};
submitTask("block-task", test3);
allProcessed.await(); // executed another task to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));
}
static class TaskExecutor implements TestExecutor<Integer> {
List<Integer> tasks = new ArrayList<>();
@Override
public void execute(List<Integer> tasks) {
this.tasks.addAll(tasks);
}
}
/**
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
*/
public void testPrioritizedTasks() throws Exception {
BlockingTask block = new BlockingTask(Priority.IMMEDIATE);
submitTask("test", block);
int taskCount = randomIntBetween(5, 20);
// will hold all the tasks in the order in which they were executed
List<PrioritizedTask> tasks = new ArrayList<>(taskCount);
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
Priority priority = randomFrom(Priority.values());
PrioritizedTask task = new PrioritizedTask(priority, latch, tasks);
submitTask("test", task);
}
block.close();
latch.await();
Priority prevPriority = null;
for (PrioritizedTask task : tasks) {
if (prevPriority == null) {
prevPriority = task.priority();
} else {
assertThat(task.priority().sameOrAfter(prevPriority), is(true));
}
}
}
protected static class BlockingTask extends TestTask implements Releasable {
private final CountDownLatch latch = new CountDownLatch(1);
private final Priority priority;
BlockingTask(Priority priority) {
super();
this.priority = priority;
}
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onFailure(String source, Exception e) {
}
@Override
public Priority priority() {
return priority;
}
public void close() {
latch.countDown();
}
}
protected static class PrioritizedTask extends TestTask {
private final CountDownLatch latch;
private final List<PrioritizedTask> tasks;
private final Priority priority;
private PrioritizedTask(Priority priority, CountDownLatch latch, List<PrioritizedTask> tasks) {
super();
this.latch = latch;
this.tasks = tasks;
this.priority = priority;
}
@Override
public void run() {
tasks.add(this);
latch.countDown();
}
@Override
public Priority priority() {
return priority;
}
@Override
public void onFailure(String source, Exception e) {
latch.countDown();
}
}
}

View File

@ -65,7 +65,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
} }
public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -94,7 +94,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
} }
public void testExecutePrioritizedExecutorWithRunnables() throws Exception { public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -123,7 +123,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
} }
public void testSubmitPrioritizedExecutorWithCallables() throws Exception { public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -152,7 +152,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
} }
public void testSubmitPrioritizedExecutorWithMixed() throws Exception { public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null);
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -182,7 +182,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
public void testTimeout() throws Exception { public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName())); ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName()));
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer);
final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch invoked = new CountDownLatch(1);
final CountDownLatch block = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -219,7 +219,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
public String toString() { public String toString() {
return "the waiting"; return "the waiting";
} }
}, timer, TimeValue.timeValueMillis(100) /* enough timeout to catch them in the pending list... */, new Runnable() { }, TimeValue.timeValueMillis(100) /* enough timeout to catch them in the pending list... */, new Runnable() {
@Override @Override
public void run() { public void run() {
timedOut.countDown(); timedOut.countDown();
@ -245,14 +245,14 @@ public class PrioritizedExecutorsTests extends ESTestCase {
ThreadPool threadPool = new TestThreadPool("test"); ThreadPool threadPool = new TestThreadPool("test");
final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler();
final AtomicBoolean timeoutCalled = new AtomicBoolean(); final AtomicBoolean timeoutCalled = new AtomicBoolean();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer);
final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch invoked = new CountDownLatch(1);
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
invoked.countDown(); invoked.countDown();
} }
}, timer, TimeValue.timeValueHours(1), new Runnable() { }, TimeValue.timeValueHours(1), new Runnable() {
@Override @Override
public void run() { public void run() {
// We should never get here // We should never get here

View File

@ -2911,27 +2911,36 @@ public class InternalEngineTests extends ESTestCase {
} else { } else {
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
} }
// test index with document failure
Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); Engine.IndexResult indexResult = engine.index(indexForDoc(doc1));
assertNotNull(indexResult.getFailure()); assertNotNull(indexResult.getFailure());
// document failures should be recorded in translog assertThat(indexResult.getSeqNo(), equalTo(0L));
assertThat(indexResult.getVersion(), equalTo(Versions.MATCH_ANY));
assertNotNull(indexResult.getTranslogLocation()); assertNotNull(indexResult.getTranslogLocation());
throwingIndexWriter.get().clearFailure(); throwingIndexWriter.get().clearFailure();
indexResult = engine.index(indexForDoc(doc1)); indexResult = engine.index(indexForDoc(doc1));
assertThat(indexResult.getSeqNo(), equalTo(1L));
assertThat(indexResult.getVersion(), equalTo(1L));
assertNull(indexResult.getFailure()); assertNull(indexResult.getFailure());
// document failures should be recorded in translog
assertNotNull(indexResult.getTranslogLocation()); assertNotNull(indexResult.getTranslogLocation());
engine.index(indexForDoc(doc2)); engine.index(indexForDoc(doc2));
// test failure while deleting // test failure while deleting
// all these simulated exceptions are not fatal to the IW so we treat them as document failures // all these simulated exceptions are not fatal to the IW so we treat them as document failures
final Engine.DeleteResult deleteResult;
if (randomBoolean()) { if (randomBoolean()) {
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated")); throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
assertThat(engine.delete(new Engine.Delete("test", "1", newUid(doc1))).getFailure(), instanceOf(IOException.class)); deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1)));
assertThat(deleteResult.getFailure(), instanceOf(IOException.class));
} else { } else {
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
assertThat(engine.delete(new Engine.Delete("test", "1", newUid(doc1))).getFailure(), deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1)));
assertThat(deleteResult.getFailure(),
instanceOf(IllegalArgumentException.class)); instanceOf(IllegalArgumentException.class));
} }
assertThat(deleteResult.getVersion(), equalTo(2L));
assertThat(deleteResult.getSeqNo(), equalTo(3L));
// test non document level failure is thrown // test non document level failure is thrown
if (randomBoolean()) { if (randomBoolean()) {

View File

@ -23,36 +23,8 @@ characters.
================================================ ================================================
:plugin_name: analysis-icu
[[analysis-icu-install]] include::install_remove.asciidoc[]
[float]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install analysis-icu
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/analysis-icu/analysis-icu-{version}.zip.
[[analysis-icu-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove analysis-icu
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[analysis-icu-normalization-charfilter]] [[analysis-icu-normalization-charfilter]]
==== ICU Normalization Character Filter ==== ICU Normalization Character Filter

View File

@ -4,35 +4,8 @@
The Japanese (kuromoji) Analysis plugin integrates Lucene kuromoji analysis The Japanese (kuromoji) Analysis plugin integrates Lucene kuromoji analysis
module into elasticsearch. module into elasticsearch.
[[analysis-kuromoji-install]] :plugin_name: analysis-kuromoji
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install analysis-kuromoji
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/analysis-kuromoji/analysis-kuromoji-{version}.zip.
[[analysis-kuromoji-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove analysis-kuromoji
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[analysis-kuromoji-analyzer]] [[analysis-kuromoji-analyzer]]
==== `kuromoji` analyzer ==== `kuromoji` analyzer

View File

@ -5,35 +5,9 @@ The Phonetic Analysis plugin provides token filters which convert tokens to
their phonetic representation using Soundex, Metaphone, and a variety of other their phonetic representation using Soundex, Metaphone, and a variety of other
algorithms. algorithms.
[[analysis-phonetic-install]] :plugin_name: analysis-phonetic
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install analysis-phonetic
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/analysis-phonetic/analysis-phonetic-{version}.zip.
[[analysis-phonetic-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove analysis-phonetic
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[analysis-phonetic-token-filter]] [[analysis-phonetic-token-filter]]
==== `phonetic` token filter ==== `phonetic` token filter

View File

@ -9,36 +9,9 @@ analyzer uses probabilistic knowledge to find the optimal word segmentation
for Simplified Chinese text. The text is first broken into sentences, then for Simplified Chinese text. The text is first broken into sentences, then
each sentence is segmented into words. each sentence is segmented into words.
:plugin_name: analysis-smartcn
include::install_remove.asciidoc[]
[[analysis-smartcn-install]]
[float]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install analysis-smartcn
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/analysis-smartcn/analysis-smartcn-{version}.zip.
[[analysis-smartcn-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove analysis-smartcn
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[analysis-smartcn-tokenizer]] [[analysis-smartcn-tokenizer]]
[float] [float]

View File

@ -7,35 +7,8 @@ module for Polish into elasticsearch.
It provides high quality stemming for Polish, based on the It provides high quality stemming for Polish, based on the
http://www.egothor.org/[Egothor project]. http://www.egothor.org/[Egothor project].
[[analysis-stempel-install]] :plugin_name: analysis-stempel
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install analysis-stempel
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/analysis-stempel/analysis-stempel-{version}.zip.
[[analysis-stempel-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove analysis-stempel
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[analysis-stempel-tokenizer]] [[analysis-stempel-tokenizer]]
[float] [float]

View File

@ -5,35 +5,8 @@ The Ukrainian Analysis plugin integrates Lucene's UkrainianMorfologikAnalyzer in
It provides stemming for Ukrainian using the http://github.com/morfologik/morfologik-stemming[Morfologik project]. It provides stemming for Ukrainian using the http://github.com/morfologik/morfologik-stemming[Morfologik project].
[[analysis-ukrainian-install]] :plugin_name: analysis-ukrainian
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install analysis-ukrainian
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/analysis-ukrainian/analysis-ukrainian-{version}.zip.
[[analysis-ukrainian-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove analysis-ukrainian
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[analysis-ukrainian-analyzer]] [[analysis-ukrainian-analyzer]]
[float] [float]

View File

@ -7,35 +7,9 @@ The Azure Classic Discovery plugin uses the Azure Classic API for unicast discov
// See issue https://github.com/elastic/elasticsearch/issues/19146 // See issue https://github.com/elastic/elasticsearch/issues/19146
deprecated[5.0.0, Use coming Azure ARM Discovery plugin instead] deprecated[5.0.0, Use coming Azure ARM Discovery plugin instead]
[[discovery-azure-classic-install]] :plugin_name: discovery-azure-classic
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install discovery-azure-classic
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/discovery-azure-classic/discovery-azure-classic-{version}.zip.
[[discovery-azure-classic-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove discovery-azure-classic
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[discovery-azure-classic-usage]] [[discovery-azure-classic-usage]]
==== Azure Virtual Machine Discovery ==== Azure Virtual Machine Discovery

View File

@ -5,35 +5,8 @@ The EC2 discovery plugin uses the https://github.com/aws/aws-sdk-java[AWS API] f
*If you are looking for a hosted solution of Elasticsearch on AWS, please visit http://www.elastic.co/cloud.* *If you are looking for a hosted solution of Elasticsearch on AWS, please visit http://www.elastic.co/cloud.*
[[discovery-ec2-install]] :plugin_name: discovery-ec2
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install discovery-ec2
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/discovery-ec2/discovery-ec2-{version}.zip.
[[discovery-ec2-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove discovery-ec2
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[discovery-ec2-usage]] [[discovery-ec2-usage]]
==== Getting started with AWS ==== Getting started with AWS

View File

@ -4,38 +4,8 @@
The file-based discovery plugin uses a list of hosts/ports in a `unicast_hosts.txt` file The file-based discovery plugin uses a list of hosts/ports in a `unicast_hosts.txt` file
in the `config/discovery-file` directory for unicast discovery. in the `config/discovery-file` directory for unicast discovery.
[[discovery-file-install]] :plugin_name: discovery-file
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install discovery-file
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation. Note that installing the plugin will add a
`discovery-file` directory to the `config` folder, and a default `unicast_hosts.txt`
file that must be edited with the correct unicast hosts list before starting the node.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/discovery-file/discovery-file-{version}.zip.
[[discovery-file-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove discovery-file
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[discovery-file-usage]] [[discovery-file-usage]]
[float] [float]

View File

@ -3,35 +3,8 @@
The Google Compute Engine Discovery plugin uses the GCE API for unicast discovery. The Google Compute Engine Discovery plugin uses the GCE API for unicast discovery.
[[discovery-gce-install]] :plugin_name: discovery-gce
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install discovery-gce
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/discovery-gce/discovery-gce-{version}.zip.
[[discovery-gce-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove discovery-gce
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[discovery-gce-usage]] [[discovery-gce-usage]]
==== GCE Virtual Machine Discovery ==== GCE Virtual Machine Discovery

View File

@ -11,35 +11,8 @@ the overhead of converting back and forth between base64, you can use the CBOR
format instead of JSON and specify the field as a bytes array instead of a string format instead of JSON and specify the field as a bytes array instead of a string
representation. The processor will skip the base64 decoding then. representation. The processor will skip the base64 decoding then.
[[ingest-attachment-install]] :plugin_name: ingest-attachment
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install ingest-attachment
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/ingest-attachment/ingest-attachment-{version}.zip.
[[ingest-attachment-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove ingest-attachment
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[using-ingest-attachment]] [[using-ingest-attachment]]
==== Using the Attachment Processor in a Pipeline ==== Using the Attachment Processor in a Pipeline

View File

@ -12,35 +12,8 @@ The GeoIP processor can run with other geoip2 databases from Maxmind. The files
and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be compressed and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be compressed
with gzip. The geoip config directory is located at `$ES_HOME/config/ingest/geoip` and holds the shipped databases too. with gzip. The geoip config directory is located at `$ES_HOME/config/ingest/geoip` and holds the shipped databases too.
[[ingest-geoip-install]] :plugin_name: ingest-geoip
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install ingest-geoip
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/ingest-geoip/ingest-geoip-{version}.zip.
[[ingest-geoip-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove ingest-geoip
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[using-ingest-geoip]] [[using-ingest-geoip]]
==== Using the Geoip Processor in a Pipeline ==== Using the Geoip Processor in a Pipeline

View File

@ -6,35 +6,8 @@ This processor adds this information by default under the `user_agent` field.
The ingest-user-agent plugin ships by default with the regexes.yaml made available by uap-java with an Apache 2.0 license. For more details see https://github.com/ua-parser/uap-core. The ingest-user-agent plugin ships by default with the regexes.yaml made available by uap-java with an Apache 2.0 license. For more details see https://github.com/ua-parser/uap-core.
[[ingest-user-agent-install]] :plugin_name: ingest-user-agent
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install ingest-user-agent
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/ingest-user-agent/ingest-user-agent-{version}.zip.
[[ingest-user-agent-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove ingest-user-agent
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[using-ingest-user-agent]] [[using-ingest-user-agent]]
==== Using the user_agent Processor in a Pipeline ==== Using the user_agent Processor in a Pipeline

View File

@ -0,0 +1,40 @@
[float]
[id="{plugin_name}-install"]
==== Installation
ifeval::["{release-state}"=="unreleased"]
Version {version} of the Elastic Stack has not yet been released.
endif::[]
ifeval::["{release-state}"!="unreleased"]
This plugin can be installed using the plugin manager:
["source","sh",subs="attributes,callouts"]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install {plugin_name}
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/{plugin_name}/{plugin_name}-{version}.zip.
endif::[]
[float]
[id="{plugin_name}-remove"]
==== Removal
The plugin can be removed with the following command:
["source","sh",subs="attributes,callouts"]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove {plugin_name}
----------------------------------------------------------------
The node must be stopped before removing the plugin.

View File

@ -5,35 +5,8 @@ The mapper-murmur3 plugin provides the ability to compute hash of field values
at index-time and store them in the index. This can sometimes be helpful when at index-time and store them in the index. This can sometimes be helpful when
running cardinality aggregations on high-cardinality and large string fields. running cardinality aggregations on high-cardinality and large string fields.
[[mapper-murmur3-install]] :plugin_name: mapper-murmur3
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install mapper-murmur3
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/mapper-murmur3/mapper-murmur3-{version}.zip.
[[mapper-murmur3-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove mapper-murmur3
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[mapper-murmur3-usage]] [[mapper-murmur3-usage]]
==== Using the `murmur3` field ==== Using the `murmur3` field

View File

@ -5,35 +5,8 @@ The mapper-size plugin provides the `_size` meta field which, when enabled,
indexes the size in bytes of the original indexes the size in bytes of the original
{ref}/mapping-source-field.html[`_source`] field. {ref}/mapping-source-field.html[`_source`] field.
[[mapper-size-install]] :plugin_name: mapper-size
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install mapper-size
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/mapper-size/mapper-size-{version}.zip.
[[mapper-size-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove mapper-size
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[mapper-size-usage]] [[mapper-size-usage]]
==== Using the `_size` field ==== Using the `_size` field

View File

@ -4,35 +4,8 @@
The Azure Repository plugin adds support for using Azure as a repository for The Azure Repository plugin adds support for using Azure as a repository for
{ref}/modules-snapshots.html[Snapshot/Restore]. {ref}/modules-snapshots.html[Snapshot/Restore].
[[repository-azure-install]] :plugin_name: repository-azure
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install repository-azure
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/repository-azure/repository-azure-{version}.zip.
[[repository-azure-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove repository-azure
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[repository-azure-usage]] [[repository-azure-usage]]
==== Azure Repository ==== Azure Repository

View File

@ -4,37 +4,8 @@
The GCS repository plugin adds support for using the https://cloud.google.com/storage/[Google Cloud Storage] The GCS repository plugin adds support for using the https://cloud.google.com/storage/[Google Cloud Storage]
service as a repository for {ref}/modules-snapshots.html[Snapshot/Restore]. service as a repository for {ref}/modules-snapshots.html[Snapshot/Restore].
[[repository-gcs-install]] :plugin_name: repository-gcs
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install repository-gcs
----------------------------------------------------------------
NOTE: The plugin requires new permission to be installed in order to work
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/repository-gcs/repository-gcs-{version}.zip.
[[repository-gcs-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove repository-gcs
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[repository-gcs-usage]] [[repository-gcs-usage]]
==== Getting started ==== Getting started

View File

@ -4,35 +4,8 @@
The HDFS repository plugin adds support for using HDFS File System as a repository for The HDFS repository plugin adds support for using HDFS File System as a repository for
{ref}/modules-snapshots.html[Snapshot/Restore]. {ref}/modules-snapshots.html[Snapshot/Restore].
[[repository-hdfs-install]] :plugin_name: repository-hdfs
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed through the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install repository-hdfs
----------------------------------------------------------------
The plugin must be installed on _every_ node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/repository-hdfs/repository-hdfs-{version}.zip.
[[repository-hdfs-remove]]
[float]
==== Removal
The plugin can be removed by specifying the _installed_ package:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove repository-hdfs
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[repository-hdfs-usage]] [[repository-hdfs-usage]]
==== Getting started with HDFS ==== Getting started with HDFS

View File

@ -6,35 +6,8 @@ The S3 repository plugin adds support for using S3 as a repository for
*If you are looking for a hosted solution of Elasticsearch on AWS, please visit http://www.elastic.co/cloud.* *If you are looking for a hosted solution of Elasticsearch on AWS, please visit http://www.elastic.co/cloud.*
[[repository-s3-install]] :plugin_name: repository-s3
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install repository-s3
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/repository-s3/repository-s3-{version}.zip.
[[repository-s3-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove repository-s3
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[repository-s3-usage]] [[repository-s3-usage]]
==== Getting started with AWS ==== Getting started with AWS

View File

@ -3,35 +3,8 @@
The Store SMB plugin works around for a bug in Windows SMB and Java on windows. The Store SMB plugin works around for a bug in Windows SMB and Java on windows.
[[store-smb-install]] :plugin_name: store-smb
[float] include::install_remove.asciidoc[]
==== Installation
This plugin can be installed using the plugin manager:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin install store-smb
----------------------------------------------------------------
The plugin must be installed on every node in the cluster, and each node must
be restarted after installation.
This plugin can be downloaded for <<plugin-management-custom-url,offline install>> from
{plugin_url}/store-smb/store-smb-{version}.zip.
[[store-smb-remove]]
[float]
==== Removal
The plugin can be removed with the following command:
[source,sh]
----------------------------------------------------------------
sudo bin/elasticsearch-plugin remove store-smb
----------------------------------------------------------------
The node must be stopped before removing the plugin.
[[store-smb-usage]] [[store-smb-usage]]
==== Working around a bug in Windows SMB and Java on windows ==== Working around a bug in Windows SMB and Java on windows

View File

@ -1,24 +1,31 @@
[[modules-cross-cluster-search]] [[modules-cross-cluster-search]]
== Cross cluster search == Cross Cluster Search
experimental[] beta[]
The _cross cluster search_ feature allows any node to act as a federated client across The _cross cluster search_ feature allows any node to act as a federated client across
multiple clusters. In contrast to the _tribe_ feature, a _cross cluster search_ node won't multiple clusters. In contrast to the <<modules-tribe,tribe node>> feature, a cross cluster search node won't
join the remote cluster, instead it connects to a remote cluster in a light fashion in order to execute join the remote cluster, instead it connects to a remote cluster in a light fashion in order to execute
federated search requests. federated search requests.
The _cross cluster search_ feature works by configuring a remote cluster in the cluster state and connects only to a Cross cluster search works by configuring a remote cluster in the cluster state and connecting only to a
limited number of nodes in the remote cluster. Each remote cluster is referenced by a name and a list of seed nodes. limited number of nodes in the remote cluster. Each remote cluster is referenced by a name and a list of seed nodes.
Those seed nodes are used to discover other nodes eligible as so-called _gateway nodes_. Each node in a cluster that Those seed nodes are used to discover nodes in the remote cluster which are eligible as _gateway nodes_.
has remote clusters configured connects to one or more _gateway nodes_ and uses them to federate search requests to Each node in a cluster that has remote clusters configured connects to one or more _gateway nodes_ and uses
the remote cluster. them to federate search requests to the remote cluster.
Remote clusters can either be configured as part of the `elasticsearch.yml` file or be dynamically updated via [float]
the <<cluster-update-settings,cluster settings API>>. If a remote cluster is configured via `elasticsearch.yml` only === Configuring Cross Cluster Search
the nodes with the configuration set will be connecting to the remote cluster in which case federated search requests
will have to be sent specifically to those nodes. Remote clusters set via the Remote clusters can be specified globally using <<cluster-update-settings,cluster settings>>
<<cluster-update-settings,cluster settings API>> will be available on every node in the cluster. (which can be updated dynamically), or local to individual nodes using the
`elasticsearch.yml` file.
If a remote cluster is configured via `elasticsearch.yml` only the nodes with
that configuration will be able to connect to the remote cluster. In other
words, federated search requests will have to be sent specifically to those
nodes. Remote clusters set via the <<cluster-update-settings,cluster settings API>>
will be available on every node in the cluster.
The `elasticsearch.yml` config file for a _cross cluster search_ node just needs to list the The `elasticsearch.yml` config file for a _cross cluster search_ node just needs to list the
remote clusters that should be connected to, for instance: remote clusters that should be connected to, for instance:
@ -36,6 +43,55 @@ search:
<1> `cluster_one` and `cluster_two` are arbitrary cluster aliases representing the connection to each cluster. <1> `cluster_one` and `cluster_two` are arbitrary cluster aliases representing the connection to each cluster.
These names are subsequently used to distinguish between local and remote indices. These names are subsequently used to distinguish between local and remote indices.
The equivalent example using the <<cluster-update-settings,cluster settings API>>
to add remote clusters to all nodes in the cluster would look like the
following:
[source,js]
--------------------------------
PUT _cluster/settings
{
"persistent": {
"search": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
]
}
}
}
}
}
--------------------------------
// CONSOLE
A remote cluster can be deleted from the cluster settings by setting its seeds to `null`:
[source,js]
--------------------------------
PUT _cluster/settings
{
"persistent": {
"search": {
"remote": {
"cluster_one": {
"seeds": null <1>
}
}
}
}
}
--------------------------------
// CONSOLE
<1> `cluster_one` would be removed from the cluster settings, leaving `cluster_two` intact.
[float] [float]
=== Using cross cluster search === Using cross cluster search
@ -126,9 +182,11 @@ will be prefixed with their remote cluster name:
connected to if `search.remote.node.attr` is set to `gateway`. connected to if `search.remote.node.attr` is set to `gateway`.
`search.remote.connect`:: `search.remote.connect`::
By default, any node in the cluster can act as a cross-cluster client and connect to remote clusters.
The `search.remote.connect` setting can be set to `false` (defaults to `true`) to prevent certain nodes from By default, any node in the cluster can act as a cross-cluster client and
connecting to remote clusters. Cross-cluster search requests must be sent to a node that is allowed to act as a connect to remote clusters. The `search.remote.connect` setting can be set
cross-cluster client. to `false` (defaults to `true`) to prevent certain nodes from connecting to
remote clusters. Cross-cluster search requests must be sent to a node that
is allowed to act as a cross-cluster client.

View File

@ -134,8 +134,8 @@ appender.rolling.policies.time.modulate = true <6>
hours) hours)
NOTE: Log4j's configuration parsing gets confused by any extraneous whitespace; NOTE: Log4j's configuration parsing gets confused by any extraneous whitespace;
if you copy and paste the above settings, or enter any Log4j configuration in if you copy and paste any Log4j settings on this page, or enter any Log4j
general, be sure to trim any leading and trailing whitespace. configuration in general, be sure to trim any leading and trailing whitespace.
If you append `.gz` or `.zip` to `appender.rolling.filePattern`, then the logs If you append `.gz` or `.zip` to `appender.rolling.filePattern`, then the logs
will be compressed as they are rolled. will be compressed as they are rolled.

View File

@ -7,3 +7,4 @@
5.2.1 5.2.1
5.2.2 5.2.2
5.3.0 5.3.0
5.3.1