Renamed CoordinatorProxyAction to EnrichCoordinatorProxyAction and (#45663)
fail if query shard context needs current time (certain queries / scripts use this, but in the enrich context this is not used).
This commit is contained in:
parent
e3373d349b
commit
ac7173c0d4
|
@ -39,7 +39,7 @@ import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
|
||||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||||
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
|
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
|
||||||
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
|
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
|
||||||
import org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction;
|
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
|
||||||
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;
|
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;
|
||||||
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
|
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
|
||||||
import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
|
import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
|
||||||
|
@ -114,7 +114,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
||||||
new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class),
|
new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class),
|
||||||
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
|
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
|
||||||
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class),
|
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class),
|
||||||
new ActionHandler<>(CoordinatorProxyAction.INSTANCE, CoordinatorProxyAction.TransportAction.class),
|
new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class),
|
||||||
new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class)
|
new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
||||||
return Arrays.asList(
|
return Arrays.asList(
|
||||||
enrichPolicyLocks,
|
enrichPolicyLocks,
|
||||||
enrichPolicyExecutor,
|
enrichPolicyExecutor,
|
||||||
new CoordinatorProxyAction.Coordinator(client, settings),
|
new EnrichCoordinatorProxyAction.Coordinator(client, settings),
|
||||||
enrichPolicyMaintenanceService
|
enrichPolicyMaintenanceService
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||||
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
|
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
|
||||||
import org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction;
|
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -152,7 +152,7 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
|
||||||
|
|
||||||
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
|
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
|
||||||
return (req, handler) -> {
|
return (req, handler) -> {
|
||||||
client.execute(CoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
|
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
|
||||||
resp -> {
|
resp -> {
|
||||||
handler.accept(resp, null);
|
handler.accept(resp, null);
|
||||||
},
|
},
|
||||||
|
|
|
@ -41,12 +41,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
* This is because the enrich processor executes asynchronously and a bulk request could easily overload
|
* This is because the enrich processor executes asynchronously and a bulk request could easily overload
|
||||||
* the search tp.
|
* the search tp.
|
||||||
*/
|
*/
|
||||||
public class CoordinatorProxyAction extends ActionType<SearchResponse> {
|
public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
|
||||||
|
|
||||||
public static final CoordinatorProxyAction INSTANCE = new CoordinatorProxyAction();
|
public static final EnrichCoordinatorProxyAction INSTANCE = new EnrichCoordinatorProxyAction();
|
||||||
public static final String NAME = "indices:data/read/xpack/enrich/coordinate_lookups";
|
public static final String NAME = "indices:data/read/xpack/enrich/coordinate_lookups";
|
||||||
|
|
||||||
private CoordinatorProxyAction() {
|
private EnrichCoordinatorProxyAction() {
|
||||||
super(NAME, SearchResponse::new);
|
super(NAME, SearchResponse::new);
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ import java.util.Set;
|
||||||
* handles multi search requests targeting enrich indices more efficiently by executing them in a bulk using the same
|
* handles multi search requests targeting enrich indices more efficiently by executing them in a bulk using the same
|
||||||
* searcher and query shard context.
|
* searcher and query shard context.
|
||||||
*
|
*
|
||||||
* This action (plus some coordination logic in {@link CoordinatorProxyAction}) can be removed when msearch can
|
* This action (plus some coordination logic in {@link EnrichCoordinatorProxyAction}) can be removed when msearch can
|
||||||
* execute search requests targeted to the same shard more efficiently in a bulk like style.
|
* execute search requests targeted to the same shard more efficiently in a bulk like style.
|
||||||
*
|
*
|
||||||
* Note that this 'msearch' implementation only supports executing a query, pagination and source filtering.
|
* Note that this 'msearch' implementation only supports executing a query, pagination and source filtering.
|
||||||
|
@ -204,13 +204,12 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MultiSearchResponse shardOperation(Request request, ShardId shardId) throws IOException {
|
protected MultiSearchResponse shardOperation(Request request, ShardId shardId) throws IOException {
|
||||||
final long nowInMillis = System.currentTimeMillis();
|
|
||||||
final IndexService indexService = indicesService.indexService(shardId.getIndex());
|
final IndexService indexService = indicesService.indexService(shardId.getIndex());
|
||||||
final IndexShard indexShard = indicesService.getShardOrNull(shardId);
|
final IndexShard indexShard = indicesService.getShardOrNull(shardId);
|
||||||
try (Engine.Searcher searcher = indexShard.acquireSearcher("enrich_msearch")) {
|
try (Engine.Searcher searcher = indexShard.acquireSearcher("enrich_msearch")) {
|
||||||
final FieldsVisitor visitor = new FieldsVisitor(true);
|
final FieldsVisitor visitor = new FieldsVisitor(true);
|
||||||
final QueryShardContext context =
|
final QueryShardContext context = indexService.newQueryShardContext(shardId.id(),
|
||||||
indexService.newQueryShardContext(shardId.id(), searcher.getIndexReader(), () -> nowInMillis, null);
|
searcher.getIndexReader(), () -> {throw new UnsupportedOperationException();}, null);
|
||||||
final MapperService mapperService = context.getMapperService();
|
final MapperService mapperService = context.getMapperService();
|
||||||
final Text typeText = mapperService.documentMapper().typeText();
|
final Text typeText = mapperService.documentMapper().typeText();
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction.Coordinator;
|
import static org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction.Coordinator;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.sameInstance;
|
import static org.hamcrest.Matchers.sameInstance;
|
||||||
|
|
Loading…
Reference in New Issue