mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 01:19:02 +00:00
Dedicated threadpool for system index writes (#62792)
This commit adds a dedicated threadpool for system index write operations. The dedicated resources for system index writes serves as a means to ensure that user activity does not block important system operations from occurring such as the management of users and roles. Backport of #61655
This commit is contained in:
parent
54d97ecc60
commit
cb1dc5260f
@ -88,6 +88,11 @@ There are several thread pools, but the important ones include:
|
||||
Thread pool type is `fixed` and a default maximum size of
|
||||
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.
|
||||
|
||||
`system_write`::
|
||||
For write operations on system indices.
|
||||
Thread pool type is `fixed` and a default maximum size of
|
||||
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.
|
||||
|
||||
Changing a specific thread pool can be done by setting its type-specific
|
||||
parameters; for example, changing the number of threads in the `write` thread
|
||||
pool:
|
||||
|
@ -44,6 +44,7 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
@ -491,4 +492,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
||||
public long ramBytesUsed() {
|
||||
return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum();
|
||||
}
|
||||
|
||||
public Set<String> getIndices() {
|
||||
return Collections.unmodifiableSet(indices);
|
||||
}
|
||||
}
|
||||
|
@ -62,15 +62,17 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -82,6 +84,7 @@ import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
||||
@ -112,22 +115,24 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final SystemIndices systemIndices;
|
||||
|
||||
@Inject
|
||||
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, IngestService ingestService,
|
||||
TransportShardBulkAction shardBulkAction, NodeClient client,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure) {
|
||||
TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
this(threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
|
||||
indexNameExpressionResolver, autoCreateIndex, indexingPressure, System::nanoTime);
|
||||
indexNameExpressionResolver, autoCreateIndex, indexingPressure, systemIndices, System::nanoTime);
|
||||
}
|
||||
|
||||
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, IngestService ingestService,
|
||||
TransportShardBulkAction shardBulkAction, NodeClient client,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, LongSupplier relativeTimeProvider) {
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices,
|
||||
LongSupplier relativeTimeProvider) {
|
||||
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
|
||||
Objects.requireNonNull(relativeTimeProvider);
|
||||
this.threadPool = threadPool;
|
||||
@ -140,6 +145,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
this.client = client;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.systemIndices = systemIndices;
|
||||
clusterService.addStateApplier(this.ingestForwarder);
|
||||
}
|
||||
|
||||
@ -163,17 +169,19 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
|
||||
long indexingBytes = bulkRequest.ramBytesUsed();
|
||||
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes);
|
||||
final long indexingBytes = bulkRequest.ramBytesUsed();
|
||||
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
|
||||
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
|
||||
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
|
||||
try {
|
||||
doInternalExecute(task, bulkRequest, releasingListener);
|
||||
doInternalExecute(task, bulkRequest, executorName, releasingListener);
|
||||
} catch (Exception e) {
|
||||
releasingListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
|
||||
protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
|
||||
final long startTime = relativeTime();
|
||||
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
|
||||
|
||||
@ -211,7 +219,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
assert arePipelinesResolved : bulkRequest;
|
||||
}
|
||||
if (clusterService.localNode().isIngestNode()) {
|
||||
processBulkIndexIngestRequest(task, bulkRequest, listener);
|
||||
processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
|
||||
} else {
|
||||
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
|
||||
}
|
||||
@ -261,7 +269,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
threadPool.executor(ThreadPool.Names.WRITE).execute(
|
||||
threadPool.executor(executorName).execute(
|
||||
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
|
||||
}
|
||||
}
|
||||
@ -278,10 +286,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
}
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
|
||||
threadPool.executor(executorName).execute(() -> executeBulk(task, bulkRequest, startTime,
|
||||
ActionListener.wrap(listener::onResponse, inner -> {
|
||||
inner.addSuppressed(e);
|
||||
listener.onFailure(inner);
|
||||
}), responses, indicesThatCannotBeCreated);
|
||||
}), responses, indicesThatCannotBeCreated));
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -342,6 +351,18 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
}
|
||||
}
|
||||
|
||||
boolean isOnlySystem(BulkRequest request, SortedMap<String, IndexAbstraction> indicesLookup, SystemIndices systemIndices) {
|
||||
final boolean onlySystem = request.getIndices().stream().allMatch(indexName -> {
|
||||
final IndexAbstraction abstraction = indicesLookup.get(indexName);
|
||||
if (abstraction != null) {
|
||||
return abstraction.isSystem();
|
||||
} else {
|
||||
return systemIndices.isSystemIndex(indexName);
|
||||
}
|
||||
});
|
||||
return onlySystem;
|
||||
}
|
||||
|
||||
boolean needToCheck() {
|
||||
return autoCreateIndex.needToCheck();
|
||||
}
|
||||
@ -662,7 +683,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
return relativeTimeProvider.getAsLong();
|
||||
}
|
||||
|
||||
private void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
|
||||
private void processBulkIndexIngestRequest(Task task, BulkRequest original, String executorName,
|
||||
ActionListener<BulkResponse> listener) {
|
||||
final long ingestStartTimeInNanos = System.nanoTime();
|
||||
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
|
||||
ingestService.executeBulkRequest(
|
||||
@ -687,10 +709,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
// If a processor went async and returned a response on a different thread then
|
||||
// before we continue the bulk request we should fork back on a write thread:
|
||||
if (originalThread == Thread.currentThread()) {
|
||||
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
|
||||
doInternalExecute(task, bulkRequest, actionListener);
|
||||
assert Thread.currentThread().getName().contains(executorName);
|
||||
doInternalExecute(task, bulkRequest, executorName, actionListener);
|
||||
} else {
|
||||
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
|
||||
threadPool.executor(executorName).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
@ -698,7 +720,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
doInternalExecute(task, bulkRequest, actionListener);
|
||||
doInternalExecute(task, bulkRequest, executorName, actionListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -714,7 +736,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
}
|
||||
}
|
||||
},
|
||||
bulkRequestModifier::markItemAsDropped
|
||||
bulkRequestModifier::markItemAsDropped,
|
||||
executorName
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -67,8 +67,10 @@ import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
@ -76,6 +78,7 @@ import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
/** Performs shard-level bulk (index, delete or update) operations */
|
||||
@ -84,6 +87,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
|
||||
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
|
||||
if (shard.indexSettings().getIndexMetadata().isSystem()) {
|
||||
return Names.SYSTEM_WRITE;
|
||||
} else {
|
||||
return Names.WRITE;
|
||||
}
|
||||
};
|
||||
|
||||
private final UpdateHelper updateHelper;
|
||||
private final MappingUpdatedAction mappingUpdatedAction;
|
||||
@ -92,9 +102,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
||||
IndexingPressure indexingPressure) {
|
||||
IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, indexingPressure);
|
||||
BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
|
||||
this.updateHelper = updateHelper;
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
}
|
||||
@ -134,7 +144,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
|
||||
}
|
||||
}), listener, threadPool
|
||||
}), listener, threadPool, executor(primary)
|
||||
);
|
||||
}
|
||||
|
||||
@ -151,10 +161,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
MappingUpdatePerformer mappingUpdater,
|
||||
Consumer<ActionListener<Void>> waitForMappingUpdate,
|
||||
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
|
||||
ThreadPool threadPool) {
|
||||
ThreadPool threadPool,
|
||||
String executorName) {
|
||||
new ActionRunnable<PrimaryResult<BulkShardRequest, BulkShardResponse>>(listener) {
|
||||
|
||||
private final Executor executor = threadPool.executor(ThreadPool.Names.WRITE);
|
||||
private final Executor executor = threadPool.executor(executorName);
|
||||
|
||||
private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);
|
||||
|
||||
|
@ -20,7 +20,6 @@ package org.elasticsearch.action.resync;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
@ -33,35 +32,46 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
|
||||
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
|
||||
|
||||
private static String ACTION_NAME = "internal:index/seq_no/resync";
|
||||
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
|
||||
if (shard.indexSettings().getIndexMetadata().isSystem()) {
|
||||
return Names.SYSTEM_WRITE;
|
||||
} else {
|
||||
return Names.WRITE;
|
||||
}
|
||||
};
|
||||
|
||||
@Inject
|
||||
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||
IndexingPressure indexingPressure) {
|
||||
IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
|
||||
ResyncReplicationRequest::new, ResyncReplicationRequest::new, EXECUTOR_NAME_FUNCTION,
|
||||
true, /* we should never reject resync because of thread pool capacity on primary */
|
||||
indexingPressure);
|
||||
indexingPressure, systemIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -23,19 +23,20 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.WriteResponse;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
@ -43,12 +44,14 @@ import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.Translog.Location;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Base class for transport actions that modify data in some shard like index, delete, and shardBulk.
|
||||
@ -60,25 +63,41 @@ public abstract class TransportWriteAction<
|
||||
Response extends ReplicationResponse & WriteResponse
|
||||
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
|
||||
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final String executor;
|
||||
protected final IndexingPressure indexingPressure;
|
||||
protected final SystemIndices systemIndices;
|
||||
|
||||
private final Function<IndexShard, String> executorFunction;
|
||||
|
||||
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
|
||||
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary,
|
||||
IndexingPressure indexingPressure) {
|
||||
Writeable.Reader<ReplicaRequest> replicaRequest, Function<IndexShard, String> executorFunction,
|
||||
boolean forceExecutionOnPrimary, IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
|
||||
// ThreadPool.Names.WRITE thread pool in this class.
|
||||
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
|
||||
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
|
||||
this.executor = executor;
|
||||
this.executorFunction = executorFunction;
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.systemIndices = systemIndices;
|
||||
}
|
||||
|
||||
protected String executor(IndexShard shard) {
|
||||
return executorFunction.apply(shard);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Releasable checkOperationLimits(Request request) {
|
||||
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
|
||||
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
|
||||
}
|
||||
|
||||
protected boolean force(ReplicatedWriteRequest<?> request) {
|
||||
return forceExecutionOnPrimary || isSystemShard(request.shardId);
|
||||
}
|
||||
|
||||
protected boolean isSystemShard(ShardId shardId) {
|
||||
final IndexAbstraction abstraction = clusterService.state().metadata().getIndicesLookup().get(shardId.getIndexName());
|
||||
return abstraction != null ? abstraction.isSystem() : systemIndices.isSystemIndex(shardId.getIndexName());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -95,7 +114,7 @@ public abstract class TransportWriteAction<
|
||||
// If this primary request was received directly from the network, we must mark a new primary
|
||||
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
|
||||
// primary delegation, after the primary relocation hand-off.
|
||||
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
|
||||
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,7 +124,7 @@ public abstract class TransportWriteAction<
|
||||
|
||||
@Override
|
||||
protected Releasable checkReplicaLimits(ReplicaRequest request) {
|
||||
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary);
|
||||
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request));
|
||||
}
|
||||
|
||||
protected long replicaOperationSize(ReplicaRequest request) {
|
||||
@ -153,6 +172,7 @@ public abstract class TransportWriteAction<
|
||||
@Override
|
||||
protected void shardOperationOnPrimary(
|
||||
Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
|
||||
final String executor = executorFunction.apply(primary);
|
||||
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
@ -161,7 +181,7 @@ public abstract class TransportWriteAction<
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return forceExecutionOnPrimary;
|
||||
return force(request);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -178,7 +198,7 @@ public abstract class TransportWriteAction<
|
||||
*/
|
||||
@Override
|
||||
protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
|
||||
threadPool.executor(executor).execute(new ActionRunnable<ReplicaResult>(listener) {
|
||||
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<ReplicaResult>(listener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
dispatchedShardOperationOnReplica(request, replica, listener);
|
||||
|
@ -37,10 +37,13 @@ import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
@ -63,7 +66,6 @@ public abstract class TransportInstanceSingleOperationAction<
|
||||
protected final TransportService transportService;
|
||||
protected final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
|
||||
final String executor;
|
||||
final String shardActionName;
|
||||
|
||||
protected TransportInstanceSingleOperationAction(String actionName, ThreadPool threadPool,
|
||||
@ -75,9 +77,8 @@ public abstract class TransportInstanceSingleOperationAction<
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.executor = executor();
|
||||
this.shardActionName = actionName + "[s]";
|
||||
transportService.registerRequestHandler(shardActionName, executor, request, new ShardTransportHandler());
|
||||
transportService.registerRequestHandler(shardActionName, Names.SAME, request, new ShardTransportHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -85,7 +86,7 @@ public abstract class TransportInstanceSingleOperationAction<
|
||||
new AsyncSingleAction(request, listener).start();
|
||||
}
|
||||
|
||||
protected abstract String executor();
|
||||
protected abstract String executor(ShardId shardId);
|
||||
|
||||
protected abstract void shardOperation(Request request, ActionListener<Response> listener);
|
||||
|
||||
@ -263,16 +264,22 @@ public abstract class TransportInstanceSingleOperationAction<
|
||||
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
shardOperation(request,
|
||||
ActionListener.wrap(channel::sendResponse, e -> {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(e);
|
||||
logger.warn("failed to send response for get", inner);
|
||||
}
|
||||
threadPool.executor(executor(request.shardId)).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(e);
|
||||
logger.warn("failed to send response for " + shardActionName, inner);
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
shardOperation(request, ActionListener.wrap(channel::sendResponse, this::onFailure));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -57,6 +57,7 @@ import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -90,8 +91,9 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.WRITE;
|
||||
protected String executor(ShardId shardId) {
|
||||
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
return indexService.getIndexSettings().getIndexMetadata().isSystem() ? Names.SYSTEM_WRITE : Names.WRITE;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -267,7 +269,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||
if (retryCount < request.retryOnConflict()) {
|
||||
logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
|
||||
retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
|
||||
threadPool.executor(executor()).execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1)));
|
||||
threadPool.executor(executor(request.getShardId()))
|
||||
.execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -80,6 +80,11 @@ public interface IndexAbstraction {
|
||||
*/
|
||||
boolean isHidden();
|
||||
|
||||
/**
|
||||
* @return whether this index abstraction is hidden or not
|
||||
*/
|
||||
boolean isSystem();
|
||||
|
||||
/**
|
||||
* An index abstraction type.
|
||||
*/
|
||||
@ -162,6 +167,11 @@ public interface IndexAbstraction {
|
||||
public boolean isHidden() {
|
||||
return INDEX_HIDDEN_SETTING.get(concreteIndex.getSettings());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSystem() {
|
||||
return concreteIndex.isSystem();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -212,6 +222,11 @@ public interface IndexAbstraction {
|
||||
return isHidden;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSystem() {
|
||||
return referenceIndexMetadatas.stream().allMatch(IndexMetadata::isSystem);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the unique alias metadata per concrete index.
|
||||
* <p>
|
||||
@ -327,6 +342,12 @@ public interface IndexAbstraction {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSystem() {
|
||||
// No such thing as system data streams (yet)
|
||||
return false;
|
||||
}
|
||||
|
||||
public org.elasticsearch.cluster.metadata.DataStream getDataStream() {
|
||||
return dataStream;
|
||||
}
|
||||
|
@ -55,11 +55,11 @@ public class IndexingPressure {
|
||||
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
|
||||
}
|
||||
|
||||
public Releasable markCoordinatingOperationStarted(long bytes) {
|
||||
public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
|
||||
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
|
||||
long replicaWriteBytes = this.currentReplicaBytes.get();
|
||||
long totalBytes = combinedBytes + replicaWriteBytes;
|
||||
if (totalBytes > primaryAndCoordinatingLimits) {
|
||||
if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) {
|
||||
long bytesWithoutOperation = combinedBytes - bytes;
|
||||
long totalBytesWithoutOperation = totalBytes - bytes;
|
||||
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
|
||||
|
@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -73,14 +74,15 @@ public class RetentionLeaseSyncAction extends
|
||||
|
||||
@Inject
|
||||
public RetentionLeaseSyncAction(
|
||||
final Settings settings,
|
||||
final TransportService transportService,
|
||||
final ClusterService clusterService,
|
||||
final IndicesService indicesService,
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexingPressure indexingPressure) {
|
||||
final Settings settings,
|
||||
final TransportService transportService,
|
||||
final ClusterService clusterService,
|
||||
final IndicesService indicesService,
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexingPressure indexingPressure,
|
||||
final SystemIndices systemIndices) {
|
||||
super(
|
||||
settings,
|
||||
ACTION_NAME,
|
||||
@ -92,7 +94,7 @@ public class RetentionLeaseSyncAction extends
|
||||
actionFilters,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
ThreadPool.Names.MANAGEMENT, false, indexingPressure);
|
||||
ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressure, systemIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -452,9 +452,10 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
||||
Iterable<DocWriteRequest<?>> actionRequests,
|
||||
BiConsumer<Integer, Exception> onFailure,
|
||||
BiConsumer<Thread, Exception> onCompletion,
|
||||
IntConsumer onDropped) {
|
||||
IntConsumer onDropped,
|
||||
String executorName) {
|
||||
|
||||
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
|
||||
threadPool.executor(executorName).execute(new AbstractRunnable() {
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
@ -48,7 +48,7 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
|
||||
}
|
||||
|
||||
protected int applyHardSizeLimit(final Settings settings, final String name) {
|
||||
if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE)) {
|
||||
if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE) || name.equals(ThreadPool.Names.SYSTEM_WRITE)) {
|
||||
return 1 + EsExecutors.allocatedProcessors(settings);
|
||||
} else {
|
||||
return Integer.MAX_VALUE;
|
||||
|
@ -82,6 +82,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
|
||||
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
|
||||
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
|
||||
public static final String SYSTEM_READ = "system_read";
|
||||
public static final String SYSTEM_WRITE = "system_write";
|
||||
}
|
||||
|
||||
public enum ThreadPoolType {
|
||||
@ -140,6 +141,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
|
||||
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
|
||||
map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
|
||||
map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED);
|
||||
map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED);
|
||||
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
@ -195,6 +197,8 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
|
||||
builders.put(Names.FETCH_SHARD_STORE,
|
||||
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
|
||||
builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false));
|
||||
builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false));
|
||||
|
||||
for (final ExecutorBuilder<?> builder : customBuilders) {
|
||||
if (builders.containsKey(builder.name())) {
|
||||
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
|
||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
@ -51,6 +52,7 @@ import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
@ -111,6 +113,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
ClusterState state = mock(ClusterState.class);
|
||||
when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA);
|
||||
when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA);
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
|
||||
when(state.getNodes()).thenReturn(discoveryNodes);
|
||||
@ -122,7 +125,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
|
||||
final ExecutorService direct = EsExecutors.newDirectExecutorService();
|
||||
when(threadPool.executor(anyString())).thenReturn(direct);
|
||||
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
|
||||
null, null, null, mock(ActionFilters.class), null, null, new IndexingPressure(Settings.EMPTY)) {
|
||||
null, null, null, mock(ActionFilters.class), null, null, new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap())) {
|
||||
@Override
|
||||
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
|
||||
AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
|
||||
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
@ -44,6 +45,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -52,11 +54,13 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.Before;
|
||||
@ -72,6 +76,7 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Matchers.any;
|
||||
@ -143,7 +148,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
new AutoCreateIndex(
|
||||
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
new IndexNameExpressionResolver()
|
||||
), new IndexingPressure(SETTINGS)
|
||||
), new IndexingPressure(SETTINGS), new SystemIndices(emptyMap())
|
||||
);
|
||||
}
|
||||
@Override
|
||||
@ -198,13 +203,18 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
when(state.getNodes()).thenReturn(nodes);
|
||||
Metadata metadata = Metadata.builder().indices(ImmutableOpenMap.<String, IndexMetadata>builder()
|
||||
.putAll(
|
||||
Collections.singletonMap(
|
||||
WITH_DEFAULT_PIPELINE,
|
||||
IndexMetadata.builder(WITH_DEFAULT_PIPELINE).settings(
|
||||
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
|
||||
.build()
|
||||
).putAlias(AliasMetadata.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build()))
|
||||
.build()).build();
|
||||
MapBuilder.<String, IndexMetadata>newMapBuilder()
|
||||
.put(
|
||||
WITH_DEFAULT_PIPELINE,
|
||||
IndexMetadata.builder(WITH_DEFAULT_PIPELINE).settings(
|
||||
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
|
||||
.build())
|
||||
.putAlias(
|
||||
AliasMetadata.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build())
|
||||
.put(".system", IndexMetadata.builder(".system").settings(settings(Version.CURRENT)).system(true)
|
||||
.numberOfShards(1).numberOfReplicas(0).build())
|
||||
.map()
|
||||
).build()).build();
|
||||
when(state.getMetadata()).thenReturn(metadata);
|
||||
when(state.metadata()).thenReturn(metadata);
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
@ -224,7 +234,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
public void testIngestSkipped() throws Exception {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
bulkRequest.add(indexRequest);
|
||||
action.execute(null, bulkRequest, ActionListener.wrap(response -> {}, exception -> {
|
||||
throw new AssertionError(exception);
|
||||
@ -235,7 +245,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
|
||||
public void testSingleItemBulkActionIngestSkipped() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(response -> {}, exception -> {
|
||||
throw new AssertionError(exception);
|
||||
}));
|
||||
@ -247,10 +257,10 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
Exception exception = new Exception("fake exception");
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
IndexRequest indexRequest1 = new IndexRequest("index", "type", "id");
|
||||
indexRequest1.source(Collections.emptyMap());
|
||||
indexRequest1.source(emptyMap());
|
||||
indexRequest1.setPipeline("testpipeline");
|
||||
IndexRequest indexRequest2 = new IndexRequest("index", "type", "id");
|
||||
indexRequest2.source(Collections.emptyMap());
|
||||
indexRequest2.source(emptyMap());
|
||||
indexRequest2.setPipeline("testpipeline");
|
||||
bulkRequest.add(indexRequest1);
|
||||
bulkRequest.add(indexRequest2);
|
||||
@ -273,7 +283,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
assertFalse(responseCalled.get());
|
||||
assertFalse(failureCalled.get());
|
||||
verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(),
|
||||
failureHandler.capture(), completionHandler.capture(), any());
|
||||
failureHandler.capture(), completionHandler.capture(), any(), eq(Names.WRITE));
|
||||
completionHandler.getValue().accept(null, exception);
|
||||
assertTrue(failureCalled.get());
|
||||
|
||||
@ -290,7 +300,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
public void testSingleItemBulkActionIngestLocal() throws Exception {
|
||||
Exception exception = new Exception("fake exception");
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
indexRequest.setPipeline("testpipeline");
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
@ -308,7 +318,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
assertFalse(responseCalled.get());
|
||||
assertFalse(failureCalled.get());
|
||||
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
|
||||
completionHandler.capture(), any());
|
||||
completionHandler.capture(), any(), eq(Names.WRITE));
|
||||
completionHandler.getValue().accept(null, exception);
|
||||
assertTrue(failureCalled.get());
|
||||
|
||||
@ -320,11 +330,55 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
verifyZeroInteractions(transportService);
|
||||
}
|
||||
|
||||
public void testIngestSystemLocal() throws Exception {
|
||||
Exception exception = new Exception("fake exception");
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
IndexRequest indexRequest1 = new IndexRequest(".system").id("id");
|
||||
indexRequest1.source(emptyMap());
|
||||
indexRequest1.setPipeline("testpipeline");
|
||||
IndexRequest indexRequest2 = new IndexRequest(".system").id("id");
|
||||
indexRequest2.source(emptyMap());
|
||||
indexRequest2.setPipeline("testpipeline");
|
||||
bulkRequest.add(indexRequest1);
|
||||
bulkRequest.add(indexRequest2);
|
||||
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
ActionTestUtils.execute(action, null, bulkRequest, ActionListener.wrap(
|
||||
response -> {
|
||||
BulkItemResponse itemResponse = response.iterator().next();
|
||||
assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception"));
|
||||
responseCalled.set(true);
|
||||
},
|
||||
e -> {
|
||||
assertThat(e, sameInstance(exception));
|
||||
failureCalled.set(true);
|
||||
}));
|
||||
|
||||
// check failure works, and passes through to the listener
|
||||
assertFalse(action.isExecuted); // haven't executed yet
|
||||
assertFalse(responseCalled.get());
|
||||
assertFalse(failureCalled.get());
|
||||
verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(),
|
||||
failureHandler.capture(), completionHandler.capture(), any(), eq(Names.SYSTEM_WRITE));
|
||||
completionHandler.getValue().accept(null, exception);
|
||||
assertTrue(failureCalled.get());
|
||||
|
||||
// now check success
|
||||
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
|
||||
failureHandler.getValue().accept(0, exception); // have an exception for our one index request
|
||||
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
|
||||
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
|
||||
assertTrue(action.isExecuted);
|
||||
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
|
||||
verifyZeroInteractions(transportService);
|
||||
}
|
||||
|
||||
public void testIngestForward() throws Exception {
|
||||
localIngest = false;
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
indexRequest.setPipeline("testpipeline");
|
||||
bulkRequest.add(indexRequest);
|
||||
BulkResponse bulkResponse = mock(BulkResponse.class);
|
||||
@ -340,7 +394,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
action.execute(null, bulkRequest, listener);
|
||||
|
||||
// should not have executed ingest locally
|
||||
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any());
|
||||
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
|
||||
// but instead should have sent to a remote node with the transport service
|
||||
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
|
||||
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
|
||||
@ -369,7 +423,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
public void testSingleItemBulkActionIngestForward() throws Exception {
|
||||
localIngest = false;
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
indexRequest.setPipeline("testpipeline");
|
||||
IndexResponse indexResponse = mock(IndexResponse.class);
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
@ -384,7 +438,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
singleItemBulkWriteAction.execute(null, indexRequest, listener);
|
||||
|
||||
// should not have executed ingest locally
|
||||
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any());
|
||||
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
|
||||
// but instead should have sent to a remote node with the transport service
|
||||
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
|
||||
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
|
||||
@ -434,9 +488,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexName, String updateRequestIndexName) throws Exception {
|
||||
Exception exception = new Exception("fake exception");
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
IndexRequest indexRequest1 = new IndexRequest(indexRequestIndexName, "type", "id1").source(Collections.emptyMap());
|
||||
IndexRequest indexRequest2 = new IndexRequest(indexRequestIndexName, "type", "id2").source(Collections.emptyMap());
|
||||
IndexRequest indexRequest3 = new IndexRequest(indexRequestIndexName, "type", "id3").source(Collections.emptyMap());
|
||||
IndexRequest indexRequest1 = new IndexRequest(indexRequestIndexName, "type", "id1").source(emptyMap());
|
||||
IndexRequest indexRequest2 = new IndexRequest(indexRequestIndexName, "type", "id2").source(emptyMap());
|
||||
IndexRequest indexRequest3 = new IndexRequest(indexRequestIndexName, "type", "id3").source(emptyMap());
|
||||
UpdateRequest upsertRequest = new UpdateRequest(updateRequestIndexName, "type", "id1")
|
||||
.upsert(indexRequest1).script(mockScript("1"));
|
||||
UpdateRequest docAsUpsertRequest = new UpdateRequest(updateRequestIndexName, "type", "id2")
|
||||
@ -469,7 +523,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
assertFalse(responseCalled.get());
|
||||
assertFalse(failureCalled.get());
|
||||
verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(),
|
||||
failureHandler.capture(), completionHandler.capture(), any());
|
||||
failureHandler.capture(), completionHandler.capture(), any(), eq(Names.WRITE));
|
||||
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
|
||||
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
|
||||
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
|
||||
@ -490,7 +544,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
Exception exception = new Exception("fake exception");
|
||||
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
|
||||
indexRequest.setPipeline("testpipeline");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
action.needToCheck = true;
|
||||
@ -508,7 +562,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
assertFalse(responseCalled.get());
|
||||
assertFalse(failureCalled.get());
|
||||
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
|
||||
completionHandler.capture(), any());
|
||||
completionHandler.capture(), any(), eq(Names.WRITE));
|
||||
completionHandler.getValue().accept(null, exception);
|
||||
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
|
||||
assertTrue(failureCalled.get());
|
||||
@ -525,7 +579,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
public void testNotFindDefaultPipelineFromTemplateMatches(){
|
||||
Exception exception = new Exception("fake exception");
|
||||
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
|
||||
@ -561,7 +615,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
when(metadata.indices()).thenReturn(ImmutableOpenMap.of());
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
|
||||
@ -573,7 +627,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
|
||||
assertEquals("pipeline2", indexRequest.getPipeline());
|
||||
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
|
||||
completionHandler.capture(), any());
|
||||
completionHandler.capture(), any(), eq(Names.WRITE));
|
||||
}
|
||||
|
||||
public void testFindDefaultPipelineFromV2TemplateMatch() {
|
||||
@ -591,7 +645,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
when(state.getMetadata()).thenReturn(metadata);
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest("missing_index").id("id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
|
||||
@ -603,12 +657,12 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
|
||||
assertEquals("pipeline2", indexRequest.getPipeline());
|
||||
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
|
||||
completionHandler.capture(), any());
|
||||
completionHandler.capture(), any(), eq(Names.WRITE));
|
||||
}
|
||||
|
||||
private void validateDefaultPipeline(IndexRequest indexRequest) {
|
||||
Exception exception = new Exception("fake exception");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
indexRequest.source(emptyMap());
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
assertNull(indexRequest.getPipeline());
|
||||
@ -626,7 +680,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
assertFalse(responseCalled.get());
|
||||
assertFalse(failureCalled.get());
|
||||
verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(),
|
||||
completionHandler.capture(), any());
|
||||
completionHandler.capture(), any(), eq(Names.WRITE));
|
||||
assertEquals(indexRequest.getPipeline(), "default_pipeline");
|
||||
completionHandler.getValue().accept(null, exception);
|
||||
assertTrue(failureCalled.get());
|
||||
|
@ -33,6 +33,9 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
||||
import org.elasticsearch.cluster.metadata.IndexAbstraction.Index;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
@ -42,6 +45,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
@ -51,9 +56,16 @@ import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.action.bulk.TransportBulkAction.prohibitCustomRoutingOnDataStream;
|
||||
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamServiceTests.createDataStream;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
@ -77,7 +89,7 @@ public class TransportBulkActionTests extends ESTestCase {
|
||||
super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null,
|
||||
null, new ActionFilters(Collections.emptySet()), new Resolver(),
|
||||
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()),
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -96,7 +108,7 @@ public class TransportBulkActionTests extends ESTestCase {
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
|
||||
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), emptyMap(),
|
||||
DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
|
||||
clusterService = createClusterService(threadPool, discoveryNode);
|
||||
CapturingTransport capturingTransport = new CapturingTransport();
|
||||
@ -152,7 +164,7 @@ public class TransportBulkActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testGetIndexWriteRequest() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", "id1").source(Collections.emptyMap());
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", "id1").source(emptyMap());
|
||||
UpdateRequest upsertRequest = new UpdateRequest("index", "type", "id1").upsert(indexRequest).script(mockScript("1"));
|
||||
UpdateRequest docAsUpsertRequest = new UpdateRequest("index", "type", "id2").doc(indexRequest).docAsUpsert(true);
|
||||
UpdateRequest scriptedUpsert = new UpdateRequest("index", "type", "id2").upsert(indexRequest).script(mockScript("1"))
|
||||
@ -237,4 +249,47 @@ public class TransportBulkActionTests extends ESTestCase {
|
||||
.routing("custom");
|
||||
prohibitCustomRoutingOnDataStream(writeRequestAgainstIndex, metadata);
|
||||
}
|
||||
|
||||
public void testOnlySystem() {
|
||||
SortedMap<String, IndexAbstraction> indicesLookup = new TreeMap<>();
|
||||
Settings settings = Settings.builder().put("index.version.created", Version.CURRENT).build();
|
||||
indicesLookup.put(".foo",
|
||||
new Index(IndexMetadata.builder(".foo").settings(settings).system(true).numberOfShards(1).numberOfReplicas(0).build()));
|
||||
indicesLookup.put(".bar",
|
||||
new Index(IndexMetadata.builder(".bar").settings(settings).system(true).numberOfShards(1).numberOfReplicas(0).build()));
|
||||
SystemIndices systemIndices = new SystemIndices(singletonMap("plugin", singletonList(new SystemIndexDescriptor(".test", ""))));
|
||||
List<String> onlySystem = Arrays.asList(".foo", ".bar");
|
||||
assertTrue(bulkAction.isOnlySystem(buildBulkRequest(onlySystem), indicesLookup, systemIndices));
|
||||
|
||||
onlySystem = Arrays.asList(".foo", ".bar", ".test");
|
||||
assertTrue(bulkAction.isOnlySystem(buildBulkRequest(onlySystem), indicesLookup, systemIndices));
|
||||
|
||||
List<String> nonSystem = Arrays.asList("foo", "bar");
|
||||
assertFalse(bulkAction.isOnlySystem(buildBulkRequest(nonSystem), indicesLookup, systemIndices));
|
||||
|
||||
List<String> mixed = Arrays.asList(".foo", ".test", "other");
|
||||
assertFalse(bulkAction.isOnlySystem(buildBulkRequest(mixed), indicesLookup, systemIndices));
|
||||
}
|
||||
|
||||
private BulkRequest buildBulkRequest(List<String> indices) {
|
||||
BulkRequest request = new BulkRequest();
|
||||
for (String index : indices) {
|
||||
final DocWriteRequest<?> subRequest;
|
||||
switch (randomIntBetween(1, 3)) {
|
||||
case 1:
|
||||
subRequest = new IndexRequest(index);
|
||||
break;
|
||||
case 2:
|
||||
subRequest = new DeleteRequest(index).id("0");
|
||||
break;
|
||||
case 3:
|
||||
subRequest = new UpdateRequest(index, "0");
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("only have 3 cases");
|
||||
}
|
||||
request.add(subRequest);
|
||||
}
|
||||
return request;
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
@ -63,6 +64,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
@ -87,7 +89,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
|
||||
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), emptyMap(),
|
||||
DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
|
||||
clusterService = createClusterService(threadPool, discoveryNode);
|
||||
}
|
||||
@ -242,6 +244,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
|
||||
indexNameExpressionResolver,
|
||||
autoCreateIndex,
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new SystemIndices(emptyMap()),
|
||||
relativeTimeProvider);
|
||||
}
|
||||
|
||||
|
@ -56,6 +56,7 @@ import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
@ -228,7 +229,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}), latch::countDown), threadPool);
|
||||
}), latch::countDown), threadPool, Names.WRITE);
|
||||
|
||||
latch.await();
|
||||
}
|
||||
@ -820,7 +821,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||
DocWriteResponse response = primaryResponse.getResponse();
|
||||
assertThat(response.status(), equalTo(RestStatus.CREATED));
|
||||
assertThat(response.getSeqNo(), equalTo(13L));
|
||||
}), latch), threadPool);
|
||||
}), latch), threadPool, Names.WRITE);
|
||||
latch.await();
|
||||
}
|
||||
|
||||
@ -893,7 +894,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||
// Assert that we still need to fsync the location that was successfully written
|
||||
assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location,
|
||||
equalTo(resultLocation1))), latch),
|
||||
rejectingThreadPool);
|
||||
rejectingThreadPool,
|
||||
Names.WRITE);
|
||||
latch.await();
|
||||
|
||||
assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
|
||||
|
@ -20,6 +20,7 @@ package org.elasticsearch.action.resync;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
@ -44,6 +45,7 @@ import org.elasticsearch.index.shard.ReplicationGroup;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
@ -61,6 +63,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
@ -121,6 +124,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
|
||||
|
||||
final AtomicInteger acquiredPermits = new AtomicInteger();
|
||||
final IndexShard indexShard = mock(IndexShard.class);
|
||||
when(indexShard.indexSettings()).thenReturn(new IndexSettings(indexMetadata, Settings.EMPTY));
|
||||
when(indexShard.shardId()).thenReturn(shardId);
|
||||
when(indexShard.routingEntry()).thenReturn(primaryShardRouting);
|
||||
when(indexShard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
|
||||
@ -145,7 +149,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
|
||||
|
||||
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
|
||||
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap()));
|
||||
|
||||
assertThat(action.globalBlockLevel(), nullValue());
|
||||
assertThat(action.indexBlockLevel(), nullValue());
|
||||
|
@ -48,6 +48,7 @@ import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
@ -76,6 +77,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
@ -366,8 +368,8 @@ public class TransportWriteActionTests extends ESTestCase {
|
||||
super(Settings.EMPTY, "internal:test",
|
||||
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false,
|
||||
new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap()));
|
||||
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
|
||||
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
|
||||
}
|
||||
@ -376,8 +378,8 @@ public class TransportWriteActionTests extends ESTestCase {
|
||||
ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
|
||||
super(settings, actionName, transportService, clusterService,
|
||||
mockIndicesService(clusterService), threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
|
||||
new IndexingPressure(settings));
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false,
|
||||
new IndexingPressure(settings), new SystemIndices(emptyMap()));
|
||||
this.withDocumentFailureOnPrimary = false;
|
||||
this.withDocumentFailureOnReplica = false;
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
protected String executor(ShardId shardId) {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
|
@ -31,8 +31,8 @@ public class IndexingPressureTests extends ESTestCase {
|
||||
|
||||
public void testMemoryBytesMarkedAndReleased() {
|
||||
IndexingPressure indexingPressure = new IndexingPressure(settings);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10);
|
||||
Releasable coordinating2 = indexingPressure.markCoordinatingOperationStarted(50);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10, false);
|
||||
Releasable coordinating2 = indexingPressure.markCoordinatingOperationStarted(50, false);
|
||||
Releasable primary = indexingPressure.markPrimaryOperationStarted(15, true);
|
||||
Releasable primary2 = indexingPressure.markPrimaryOperationStarted(5, false);
|
||||
Releasable replica = indexingPressure.markReplicaOperationStarted(25, true);
|
||||
@ -56,7 +56,7 @@ public class IndexingPressureTests extends ESTestCase {
|
||||
|
||||
public void testAvoidDoubleAccounting() {
|
||||
IndexingPressure indexingPressure = new IndexingPressure(settings);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10, false);
|
||||
Releasable primary = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(15)) {
|
||||
IndexingPressureStats stats = indexingPressure.stats();
|
||||
assertEquals(10, stats.getCurrentCoordinatingBytes());
|
||||
@ -74,11 +74,11 @@ public class IndexingPressureTests extends ESTestCase {
|
||||
|
||||
public void testCoordinatingPrimaryRejections() {
|
||||
IndexingPressure indexingPressure = new IndexingPressure(settings);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3, false);
|
||||
Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false);
|
||||
Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) {
|
||||
if (randomBoolean()) {
|
||||
expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 2));
|
||||
expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 2, false));
|
||||
IndexingPressureStats stats = indexingPressure.stats();
|
||||
assertEquals(1, stats.getCoordinatingRejections());
|
||||
assertEquals(1024 * 6, stats.getCurrentCombinedCoordinatingAndPrimaryBytes());
|
||||
@ -109,7 +109,7 @@ public class IndexingPressureTests extends ESTestCase {
|
||||
|
||||
public void testReplicaRejections() {
|
||||
IndexingPressure indexingPressure = new IndexingPressure(settings);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3);
|
||||
try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3, false);
|
||||
Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false);
|
||||
Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) {
|
||||
// Replica will not be rejected until replica bytes > 15KB
|
||||
@ -133,4 +133,13 @@ public class IndexingPressureTests extends ESTestCase {
|
||||
|
||||
assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes());
|
||||
}
|
||||
|
||||
public void testForceExecutionOnCoordinating() {
|
||||
IndexingPressure indexingPressure = new IndexingPressure(settings);
|
||||
expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 11, false));
|
||||
try (Releasable ignore = indexingPressure.markCoordinatingOperationStarted(1024 * 11, true)) {
|
||||
assertEquals(1024 * 11, indexingPressure.stats().getCurrentCoordinatingBytes());
|
||||
}
|
||||
assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes());
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
@ -43,6 +44,7 @@ import org.elasticsearch.transport.TransportService;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.mock.orig.Mockito.when;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
@ -105,7 +107,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new SystemIndices(emptyMap()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
action.dispatchedShardOperationOnPrimary(request, indexShard,
|
||||
@ -142,7 +145,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new SystemIndices(emptyMap()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
||||
@ -182,7 +186,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexingPressure(Settings.EMPTY));
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new SystemIndices(emptyMap()));
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
}
|
||||
|
@ -63,6 +63,7 @@ import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.hamcrest.CustomTypeSafeMatcher;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
@ -160,7 +161,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
|
||||
assertTrue(failure.get());
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
@ -674,7 +676,7 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
|
||||
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
|
||||
completionHandler, indexReq -> {});
|
||||
completionHandler, indexReq -> {}, Names.WRITE);
|
||||
|
||||
assertTrue(failure.get());
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
@ -707,7 +709,7 @@ public class IngestServiceTests extends ESTestCase {
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
|
||||
completionHandler, indexReq -> {});
|
||||
completionHandler, indexReq -> {}, Names.WRITE);
|
||||
verify(failureHandler, times(1)).accept(
|
||||
argThat(new CustomTypeSafeMatcher<Integer>("failure handler was not called with the expected arguments") {
|
||||
@Override
|
||||
@ -741,7 +743,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
verify(failureHandler, never()).accept(any(), any());
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
}
|
||||
@ -760,7 +763,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
verify(failureHandler, never()).accept(any(), any());
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
}
|
||||
@ -806,7 +810,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
verify(processor).execute(any(), any());
|
||||
verify(failureHandler, never()).accept(any(), any());
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
@ -839,7 +844,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
||||
verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
@ -881,7 +887,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class));
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
}
|
||||
@ -919,7 +926,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
||||
verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
@ -971,7 +979,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
|
||||
verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher<Exception>() {
|
||||
@Override
|
||||
@ -1024,7 +1033,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||
ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
|
||||
verify(requestItemErrorHandler, never()).accept(any(), any());
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
@ -1081,7 +1091,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
final IndexRequest indexRequest = new IndexRequest("_index");
|
||||
indexRequest.setPipeline("_id1").setFinalPipeline("_none");
|
||||
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
final IngestStats afterFirstRequestStats = ingestService.stats();
|
||||
assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2));
|
||||
|
||||
@ -1099,7 +1110,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
|
||||
|
||||
indexRequest.setPipeline("_id2");
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
final IngestStats afterSecondRequestStats = ingestService.stats();
|
||||
assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2));
|
||||
//total
|
||||
@ -1118,7 +1130,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
clusterState = IngestService.innerPut(putRequest, clusterState);
|
||||
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
||||
indexRequest.setPipeline("_id1");
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
final IngestStats afterThirdRequestStats = ingestService.stats();
|
||||
assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2));
|
||||
//total
|
||||
@ -1142,7 +1155,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||
clusterState = IngestService.innerPut(putRequest, clusterState);
|
||||
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
||||
indexRequest.setPipeline("_id1");
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {},
|
||||
Names.WRITE);
|
||||
final IngestStats afterForthRequestStats = ingestService.stats();
|
||||
assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2));
|
||||
//total
|
||||
@ -1228,7 +1242,7 @@ public class IngestServiceTests extends ESTestCase {
|
||||
@SuppressWarnings("unchecked")
|
||||
final IntConsumer dropHandler = mock(IntConsumer.class);
|
||||
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
|
||||
completionHandler, dropHandler);
|
||||
completionHandler, dropHandler, Names.WRITE);
|
||||
verify(failureHandler, never()).accept(any(), any());
|
||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||
verify(dropHandler, times(1)).accept(1);
|
||||
@ -1294,7 +1308,7 @@ public class IngestServiceTests extends ESTestCase {
|
||||
new IndexRequest("_index").id("_doc-id").source(builder).setPipeline("_id").setFinalPipeline("_none");
|
||||
|
||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest),
|
||||
(integer, e) -> {}, (thread, e) -> {}, indexReq -> {});
|
||||
(integer, e) -> {}, (thread, e) -> {}, indexReq -> {}, Names.WRITE);
|
||||
}
|
||||
|
||||
assertThat(reference.get(), is(instanceOf(byte[].class)));
|
||||
|
@ -1552,7 +1552,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
new IndexingPressure(settings))),
|
||||
new IndexingPressure(settings),
|
||||
new SystemIndices(emptyMap()))),
|
||||
new GlobalCheckpointSyncAction(
|
||||
settings,
|
||||
transportService,
|
||||
@ -1578,15 +1579,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
mappingUpdatedAction.setClient(client);
|
||||
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
|
||||
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
|
||||
actionFilters, new IndexingPressure(settings));
|
||||
actionFilters, new IndexingPressure(settings), new SystemIndices(emptyMap()));
|
||||
actions.put(BulkAction.INSTANCE,
|
||||
new TransportBulkAction(threadPool, transportService, clusterService,
|
||||
new IngestService(
|
||||
clusterService, threadPool, environment, scriptService,
|
||||
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
|
||||
Collections.emptyList(), client),
|
||||
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
|
||||
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new IndexingPressure(settings)
|
||||
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
|
||||
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new IndexingPressure(settings),
|
||||
new SystemIndices(emptyMap())
|
||||
));
|
||||
final RestoreService restoreService = new RestoreService(
|
||||
clusterService, repositoriesService, allocationService,
|
||||
|
@ -86,6 +86,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -793,7 +794,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||
TransportWriteActionTestHelper.performPostWriteActions(primary, request,
|
||||
((TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location, logger);
|
||||
listener.onResponse((TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result);
|
||||
}), threadPool);
|
||||
}), threadPool, Names.WRITE);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ package org.elasticsearch.xpack.ccr.action.bulk;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
@ -19,6 +18,7 @@ import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
@ -26,19 +26,28 @@ import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.SystemIndices;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class TransportBulkShardOperationsAction
|
||||
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
|
||||
|
||||
private final IndexingPressure indexingPressure;
|
||||
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
|
||||
if (shard.indexSettings().getIndexMetadata().isSystem()) {
|
||||
return Names.SYSTEM_WRITE;
|
||||
} else {
|
||||
return Names.WRITE;
|
||||
}
|
||||
};
|
||||
|
||||
@Inject
|
||||
public TransportBulkShardOperationsAction(
|
||||
@ -49,7 +58,8 @@ public class TransportBulkShardOperationsAction
|
||||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexingPressure indexingPressure) {
|
||||
final IndexingPressure indexingPressure,
|
||||
final SystemIndices systemIndices) {
|
||||
super(
|
||||
settings,
|
||||
BulkShardOperationsAction.NAME,
|
||||
@ -61,14 +71,13 @@ public class TransportBulkShardOperationsAction
|
||||
actionFilters,
|
||||
BulkShardOperationsRequest::new,
|
||||
BulkShardOperationsRequest::new,
|
||||
ThreadPool.Names.WRITE, false, indexingPressure);
|
||||
this.indexingPressure = indexingPressure;
|
||||
EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener) {
|
||||
// This is executed on the follower coordinator node and we need to mark the bytes.
|
||||
Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationSize(request));
|
||||
Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationSize(request), false);
|
||||
ActionListener<BulkShardOperationsResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
try {
|
||||
super.doExecute(task, request, releasingListener);
|
||||
|
@ -65,10 +65,12 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
|
||||
@Override
|
||||
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
// Write tp is expected when executing enrich processor from index / bulk api
|
||||
// System_write is expected when executing enrich against system indices
|
||||
// Management tp is expected when executing enrich processor from ingest simulate api
|
||||
// Search tp is allowed for now - After enriching, the remaining parts of the pipeline are processed on the
|
||||
// search thread, which could end up here again if there is more than one enrich processor in a pipeline.
|
||||
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
|
||||
|| Thread.currentThread().getName().contains(ThreadPool.Names.SYSTEM_WRITE)
|
||||
|| Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH)
|
||||
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
|
||||
coordinator.schedule(request, listener);
|
||||
|
Loading…
x
Reference in New Issue
Block a user