mirror of https://github.com/apache/lucene.git
SOLR-14354: HttpShardHandler send requests in async (#1470)
This commit is contained in:
parent
5fc12745ca
commit
a80eb84d56
|
@ -107,7 +107,8 @@ Improvements
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
---------------------
|
---------------------
|
||||||
(No changes)
|
|
||||||
|
* SOLR-14354: HttpShardHandler send requests in async (Cao Manh Dat).
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
---------------------
|
---------------------
|
||||||
|
|
|
@ -639,7 +639,7 @@ public class Overseer implements SolrCloseable {
|
||||||
|
|
||||||
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
|
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
|
||||||
|
|
||||||
OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getDefaultHttpClient());
|
OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory());
|
||||||
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
|
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
|
||||||
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
|
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
|
||||||
ccThread.setDaemon(true);
|
ccThread.setDaemon(true);
|
||||||
|
|
|
@ -20,7 +20,6 @@ import java.lang.invoke.MethodHandles;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.http.client.HttpClient;
|
|
||||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||||
import org.apache.solr.common.cloud.SolrZkClient;
|
import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
|
@ -29,7 +28,6 @@ import org.apache.solr.common.params.CoreAdminParams;
|
||||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
|
||||||
import org.apache.solr.handler.component.ShardHandler;
|
import org.apache.solr.handler.component.ShardHandler;
|
||||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||||
import org.apache.solr.handler.component.ShardRequest;
|
import org.apache.solr.handler.component.ShardRequest;
|
||||||
|
@ -53,14 +51,11 @@ public class OverseerNodePrioritizer {
|
||||||
|
|
||||||
private ZkDistributedQueue stateUpdateQueue;
|
private ZkDistributedQueue stateUpdateQueue;
|
||||||
|
|
||||||
private HttpClient httpClient;
|
public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory) {
|
||||||
|
|
||||||
public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory, HttpClient httpClient) {
|
|
||||||
this.zkStateReader = zkStateReader;
|
this.zkStateReader = zkStateReader;
|
||||||
this.adminPath = adminPath;
|
this.adminPath = adminPath;
|
||||||
this.shardHandlerFactory = shardHandlerFactory;
|
this.shardHandlerFactory = shardHandlerFactory;
|
||||||
this.stateUpdateQueue = stateUpdateQueue;
|
this.stateUpdateQueue = stateUpdateQueue;
|
||||||
this.httpClient = httpClient;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
|
public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
|
||||||
|
@ -108,7 +103,7 @@ public class OverseerNodePrioritizer {
|
||||||
|
|
||||||
private void invokeOverseerOp(String electionNode, String op) {
|
private void invokeOverseerOp(String electionNode, String op) {
|
||||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(httpClient);
|
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
|
params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
|
||||||
params.set("op", op);
|
params.set("op", op);
|
||||||
params.set("qt", adminPath);
|
params.set("qt", adminPath);
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.solr.core.CoreDescriptor;
|
import org.apache.solr.core.CoreDescriptor;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
|
||||||
import org.apache.solr.handler.component.ShardHandler;
|
import org.apache.solr.handler.component.ShardHandler;
|
||||||
import org.apache.solr.handler.component.ShardRequest;
|
import org.apache.solr.handler.component.ShardRequest;
|
||||||
import org.apache.solr.handler.component.ShardResponse;
|
import org.apache.solr.handler.component.ShardResponse;
|
||||||
|
@ -70,7 +69,7 @@ public class SyncStrategy {
|
||||||
public SyncStrategy(CoreContainer cc) {
|
public SyncStrategy(CoreContainer cc) {
|
||||||
UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler();
|
UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler();
|
||||||
client = updateShardHandler.getDefaultHttpClient();
|
client = updateShardHandler.getDefaultHttpClient();
|
||||||
shardHandler = ((HttpShardHandlerFactory)cc.getShardHandlerFactory()).getShardHandler(cc.getUpdateShardHandler().getDefaultHttpClient());
|
shardHandler = cc.getShardHandlerFactory().getShardHandler();
|
||||||
updateExecutor = updateShardHandler.getUpdateExecutor();
|
updateExecutor = updateShardHandler.getUpdateExecutor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -170,7 +170,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
String backupName = request.getStr(NAME);
|
String backupName = request.getStr(NAME);
|
||||||
String asyncId = request.getStr(ASYNC);
|
String asyncId = request.getStr(ASYNC);
|
||||||
String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
|
String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
|
||||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||||
|
|
||||||
String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
|
String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
|
||||||
Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
|
Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
|
||||||
|
|
|
@ -207,7 +207,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
|
||||||
collectionName, shardNames, message));
|
collectionName, shardNames, message));
|
||||||
}
|
}
|
||||||
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
|
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
|
||||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||||
for (ReplicaPosition replicaPosition : replicaPositions) {
|
for (ReplicaPosition replicaPosition : replicaPositions) {
|
||||||
String nodeName = replicaPosition.node;
|
String nodeName = replicaPosition.node;
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
@SuppressWarnings({"rawtypes"})
|
@SuppressWarnings({"rawtypes"})
|
||||||
NamedList shardRequestResults = new NamedList();
|
NamedList shardRequestResults = new NamedList();
|
||||||
Map<String, Slice> shardByCoreName = new HashMap<>();
|
Map<String, Slice> shardByCoreName = new HashMap<>();
|
||||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||||
|
|
||||||
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
|
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
|
||||||
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
|
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
|
||||||
|
|
|
@ -232,7 +232,7 @@ public class DeleteReplicaCmd implements Cmd {
|
||||||
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
|
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||||
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
|
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||||
String asyncId = message.getStr(ASYNC);
|
String asyncId = message.getStr(ASYNC);
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
String asyncId = message.getStr(ASYNC);
|
String asyncId = message.getStr(ASYNC);
|
||||||
@SuppressWarnings({"rawtypes"})
|
@SuppressWarnings({"rawtypes"})
|
||||||
NamedList shardRequestResults = new NamedList();
|
NamedList shardRequestResults = new NamedList();
|
||||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||||
SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
|
SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
|
||||||
|
|
||||||
Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
|
Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.TimeSource;
|
import org.apache.solr.common.util.TimeSource;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
|
||||||
import org.apache.solr.handler.component.ShardHandler;
|
import org.apache.solr.handler.component.ShardHandler;
|
||||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||||
import org.apache.solr.update.SolrIndexSplitter;
|
import org.apache.solr.update.SolrIndexSplitter;
|
||||||
|
@ -161,7 +160,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
|
DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
|
||||||
|
|
||||||
ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
|
ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
|
||||||
ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||||
|
|
||||||
log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
|
log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
|
||||||
// intersect source range, keyHashRange and target range
|
// intersect source range, keyHashRange and target range
|
||||||
|
|
|
@ -337,7 +337,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
||||||
sreq.shards = new String[] {baseUrl};
|
sreq.shards = new String[] {baseUrl};
|
||||||
sreq.actualShards = sreq.shards;
|
sreq.actualShards = sreq.shards;
|
||||||
sreq.params = params;
|
sreq.params = params;
|
||||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||||
shardHandler.submit(sreq, baseUrl, sreq.params);
|
shardHandler.submit(sreq, baseUrl, sreq.params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -725,7 +725,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
||||||
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
|
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
|
||||||
String collectionName = message.getStr(NAME);
|
String collectionName = message.getStr(NAME);
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||||
|
|
||||||
ClusterState clusterState = zkStateReader.getClusterState();
|
ClusterState clusterState = zkStateReader.getClusterState();
|
||||||
DocCollection coll = clusterState.getCollection(collectionName);
|
DocCollection coll = clusterState.getCollection(collectionName);
|
||||||
|
|
|
@ -93,7 +93,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
|
|
||||||
String restoreCollectionName = message.getStr(COLLECTION_PROP);
|
String restoreCollectionName = message.getStr(COLLECTION_PROP);
|
||||||
String backupName = message.getStr(NAME); // of backup
|
String backupName = message.getStr(NAME); // of backup
|
||||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||||
String asyncId = message.getStr(ASYNC);
|
String asyncId = message.getStr(ASYNC);
|
||||||
String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
|
String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
|
||||||
|
|
||||||
|
|
||||||
if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, false)) {
|
if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, false)) {
|
||||||
|
|
|
@ -16,30 +16,43 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.handler.component;
|
package org.apache.solr.handler.component;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import io.opentracing.Span;
|
||||||
|
import io.opentracing.Tracer;
|
||||||
|
import io.opentracing.propagation.Format;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrResponse;
|
||||||
import org.apache.solr.client.solrj.impl.Http2SolrClient;
|
import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.impl.LBSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
|
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
|
||||||
|
import org.apache.solr.client.solrj.util.Cancellable;
|
||||||
|
import org.apache.solr.client.solrj.util.AsyncListener;
|
||||||
import org.apache.solr.cloud.CloudDescriptor;
|
import org.apache.solr.cloud.CloudDescriptor;
|
||||||
import org.apache.solr.cloud.ZkController;
|
import org.apache.solr.cloud.ZkController;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.annotation.SolrSingleThreaded;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.params.ShardParams;
|
import org.apache.solr.common.params.ShardParams;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.core.CoreDescriptor;
|
import org.apache.solr.core.CoreDescriptor;
|
||||||
import org.apache.solr.request.SolrQueryRequest;
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
|
import org.apache.solr.request.SolrRequestInfo;
|
||||||
|
import org.apache.solr.util.tracing.GlobalTracer;
|
||||||
|
import org.apache.solr.util.tracing.SolrRequestCarrier;
|
||||||
|
|
||||||
|
@SolrSingleThreaded
|
||||||
public class HttpShardHandler extends ShardHandler {
|
public class HttpShardHandler extends ShardHandler {
|
||||||
/**
|
/**
|
||||||
* If the request context map has an entry with this key and Boolean.TRUE as value,
|
* If the request context map has an entry with this key and Boolean.TRUE as value,
|
||||||
|
@ -49,33 +62,130 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
*/
|
*/
|
||||||
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
|
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
|
||||||
|
|
||||||
final HttpShardHandlerFactory httpShardHandlerFactory;
|
private HttpShardHandlerFactory httpShardHandlerFactory;
|
||||||
private CompletionService<ShardResponse> completionService;
|
private Map<ShardResponse, Cancellable> responseCancellableMap;
|
||||||
private Set<Future<ShardResponse>> pending;
|
private BlockingQueue<ShardResponse> responses;
|
||||||
private Http2SolrClient httpClient;
|
private AtomicInteger pending;
|
||||||
|
private Map<String, List<String>> shardToURLs;
|
||||||
|
private LBHttp2SolrClient lbClient;
|
||||||
|
|
||||||
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) {
|
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
|
||||||
this.httpClient = httpClient;
|
|
||||||
this.httpShardHandlerFactory = httpShardHandlerFactory;
|
this.httpShardHandlerFactory = httpShardHandlerFactory;
|
||||||
completionService = httpShardHandlerFactory.newCompletionService();
|
this.lbClient = httpShardHandlerFactory.loadbalancer;
|
||||||
pending = new HashSet<>();
|
this.pending = new AtomicInteger(0);
|
||||||
|
this.responses = new LinkedBlockingQueue<>();
|
||||||
|
this.responseCancellableMap = new HashMap<>();
|
||||||
|
|
||||||
|
// maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
|
||||||
|
// This is primarily to keep track of what order we should use to query the replicas of a shard
|
||||||
|
// so that we use the same replica for all phases of a distributed request.
|
||||||
|
shardToURLs = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
private static class SimpleSolrResponse extends SolrResponse {
|
||||||
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
|
|
||||||
ShardRequestor shardRequestor = new ShardRequestor(sreq, shard, params, this);
|
volatile long elapsedTime;
|
||||||
try {
|
|
||||||
shardRequestor.init();
|
volatile NamedList<Object> nl;
|
||||||
pending.add(completionService.submit(shardRequestor));
|
|
||||||
} finally {
|
@Override
|
||||||
shardRequestor.end();
|
public long getElapsedTime() {
|
||||||
|
return elapsedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NamedList<Object> getResponse() {
|
||||||
|
return nl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setResponse(NamedList<Object> rsp) {
|
||||||
|
nl = rsp;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setElapsedTime(long elapsedTime) {
|
||||||
|
this.elapsedTime = elapsedTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected NamedList<Object> request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException {
|
// Not thread safe... don't use in Callable.
|
||||||
req.setBasePath(url);
|
// Don't modify the returned URL list.
|
||||||
return httpClient.request(req);
|
private List<String> getURLs(String shard) {
|
||||||
|
List<String> urls = shardToURLs.get(shard);
|
||||||
|
if (urls == null) {
|
||||||
|
urls = httpShardHandlerFactory.buildURLList(shard);
|
||||||
|
shardToURLs.put(shard, urls);
|
||||||
|
}
|
||||||
|
return urls;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
|
||||||
|
// do this outside of the callable for thread safety reasons
|
||||||
|
final List<String> urls = getURLs(shard);
|
||||||
|
final Tracer tracer = GlobalTracer.getTracer();
|
||||||
|
final Span span = tracer != null ? tracer.activeSpan() : null;
|
||||||
|
|
||||||
|
params.remove(CommonParams.WT); // use default (currently javabin)
|
||||||
|
params.remove(CommonParams.VERSION);
|
||||||
|
QueryRequest req = makeQueryRequest(sreq, params, shard);
|
||||||
|
req.setMethod(SolrRequest.METHOD.POST);
|
||||||
|
|
||||||
|
LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
|
||||||
|
|
||||||
|
ShardResponse srsp = new ShardResponse();
|
||||||
|
if (sreq.nodeName != null) {
|
||||||
|
srsp.setNodeName(sreq.nodeName);
|
||||||
|
}
|
||||||
|
srsp.setShardRequest(sreq);
|
||||||
|
srsp.setShard(shard);
|
||||||
|
SimpleSolrResponse ssr = new SimpleSolrResponse();
|
||||||
|
srsp.setSolrResponse(ssr);
|
||||||
|
|
||||||
|
pending.incrementAndGet();
|
||||||
|
// if there are no shards available for a slice, urls.size()==0
|
||||||
|
if (urls.size() == 0) {
|
||||||
|
// TODO: what's the right error code here? We should use the same thing when
|
||||||
|
// all of the servers for a shard are down.
|
||||||
|
SolrException exception = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
|
||||||
|
srsp.setException(exception);
|
||||||
|
srsp.setResponseCode(exception.code());
|
||||||
|
responses.add(srsp);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// all variables that set inside this listener must be at least volatile
|
||||||
|
responseCancellableMap.put(srsp, this.lbClient.asyncReq(lbReq, new AsyncListener<>() {
|
||||||
|
volatile long startTime = System.nanoTime();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onStart() {
|
||||||
|
if (tracer != null && span != null) {
|
||||||
|
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
|
||||||
|
}
|
||||||
|
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
|
||||||
|
if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(LBSolrClient.Rsp rsp) {
|
||||||
|
ssr.nl = rsp.getResponse();
|
||||||
|
srsp.setShardAddress(rsp.getServer());
|
||||||
|
ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
|
||||||
|
responses.add(srsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onFailure(Throwable throwable) {
|
||||||
|
ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
|
||||||
|
srsp.setException(throwable);
|
||||||
|
if (throwable instanceof SolrException) {
|
||||||
|
srsp.setResponseCode(((SolrException) throwable).code());
|
||||||
|
}
|
||||||
|
responses.add(srsp);
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -113,12 +223,12 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ShardResponse take(boolean bailOnError) {
|
private ShardResponse take(boolean bailOnError) {
|
||||||
|
try {
|
||||||
|
while (pending.get() > 0) {
|
||||||
|
ShardResponse rsp = responses.take();
|
||||||
|
responseCancellableMap.remove(rsp);
|
||||||
|
|
||||||
while (pending.size() > 0) {
|
pending.decrementAndGet();
|
||||||
try {
|
|
||||||
Future<ShardResponse> future = completionService.take();
|
|
||||||
pending.remove(future);
|
|
||||||
ShardResponse rsp = future.get();
|
|
||||||
if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
|
if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
|
||||||
// add response to the response list... we do this after the take() and
|
// add response to the response list... we do this after the take() and
|
||||||
// not after the completion of "call" so we know when the last response
|
// not after the completion of "call" so we know when the last response
|
||||||
|
@ -128,13 +238,9 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
|
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
|
||||||
return rsp;
|
return rsp;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
// should be impossible... the problem with catching the exception
|
|
||||||
// at this level is we don't know what ShardRequest it applied to
|
|
||||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e);
|
|
||||||
}
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -142,9 +248,11 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancelAll() {
|
public void cancelAll() {
|
||||||
for (Future<ShardResponse> future : pending) {
|
for (Cancellable cancellable : responseCancellableMap.values()) {
|
||||||
future.cancel(false);
|
cancellable.cancel();
|
||||||
|
pending.decrementAndGet();
|
||||||
}
|
}
|
||||||
|
responseCancellableMap.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.handler.component;
|
package org.apache.solr.handler.component;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -28,8 +27,6 @@ import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CompletionService;
|
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -37,13 +34,8 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.http.client.HttpClient;
|
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
|
||||||
import org.apache.solr.client.solrj.impl.Http2SolrClient;
|
import org.apache.solr.client.solrj.impl.Http2SolrClient;
|
||||||
import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
|
||||||
import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
|
import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
|
||||||
import org.apache.solr.client.solrj.impl.LBSolrClient;
|
import org.apache.solr.client.solrj.impl.LBSolrClient;
|
||||||
import org.apache.solr.client.solrj.routing.AffinityReplicaListTransformerFactory;
|
import org.apache.solr.client.solrj.routing.AffinityReplicaListTransformerFactory;
|
||||||
|
@ -96,7 +88,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
||||||
|
|
||||||
protected volatile Http2SolrClient defaultClient;
|
protected volatile Http2SolrClient defaultClient;
|
||||||
protected InstrumentedHttpListenerFactory httpListenerFactory;
|
protected InstrumentedHttpListenerFactory httpListenerFactory;
|
||||||
private LBHttp2SolrClient loadbalancer;
|
protected LBHttp2SolrClient loadbalancer;
|
||||||
|
|
||||||
int corePoolSize = 0;
|
int corePoolSize = 0;
|
||||||
int maximumPoolSize = Integer.MAX_VALUE;
|
int maximumPoolSize = Integer.MAX_VALUE;
|
||||||
|
@ -151,27 +143,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ShardHandler getShardHandler() {
|
public ShardHandler getShardHandler() {
|
||||||
return getShardHandler(defaultClient);
|
return new HttpShardHandler(this);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get {@link ShardHandler} that uses custom http client.
|
|
||||||
*/
|
|
||||||
public ShardHandler getShardHandler(final Http2SolrClient httpClient){
|
|
||||||
return new HttpShardHandler(this, httpClient);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public ShardHandler getShardHandler(final HttpClient httpClient) {
|
|
||||||
// a little hack for backward-compatibility when we are moving from apache http client to jetty client
|
|
||||||
return new HttpShardHandler(this, null) {
|
|
||||||
@Override
|
|
||||||
protected NamedList<Object> request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException {
|
|
||||||
try (SolrClient client = new HttpSolrClient.Builder(url).withHttpClient(httpClient).build()) {
|
|
||||||
return client.request(req);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -318,6 +290,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
||||||
this.defaultClient = new Http2SolrClient.Builder()
|
this.defaultClient = new Http2SolrClient.Builder()
|
||||||
.connectionTimeout(connectionTimeout)
|
.connectionTimeout(connectionTimeout)
|
||||||
.idleTimeout(soTimeout)
|
.idleTimeout(soTimeout)
|
||||||
|
.withExecutor(commExecutor)
|
||||||
.maxConnectionsPerHost(maxConnectionsPerHost).build();
|
.maxConnectionsPerHost(maxConnectionsPerHost).build();
|
||||||
this.defaultClient.addListenerFactory(this.httpListenerFactory);
|
this.defaultClient.addListenerFactory(this.httpListenerFactory);
|
||||||
this.loadbalancer = new LBHttp2SolrClient(defaultClient);
|
this.loadbalancer = new LBHttp2SolrClient(defaultClient);
|
||||||
|
@ -347,16 +320,16 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
|
if (loadbalancer != null) {
|
||||||
|
loadbalancer.close();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
if (loadbalancer != null) {
|
|
||||||
loadbalancer.close();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (defaultClient != null) {
|
if (defaultClient != null) {
|
||||||
IOUtils.closeQuietly(defaultClient);
|
IOUtils.closeQuietly(defaultClient);
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -371,18 +344,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
||||||
return solrMetricsContext;
|
return solrMetricsContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Makes a request to one or more of the given urls, using the configured load balancer.
|
|
||||||
*
|
|
||||||
* @param req The solr search request that should be sent through the load balancer
|
|
||||||
* @param urls The list of solr server urls to load balance across
|
|
||||||
* @return The response from the request
|
|
||||||
*/
|
|
||||||
public LBSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
|
|
||||||
throws SolrServerException, IOException {
|
|
||||||
return loadbalancer.request(newLBHttpSolrClientReq(req, urls));
|
|
||||||
}
|
|
||||||
|
|
||||||
protected LBSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List<String> urls) {
|
protected LBSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List<String> urls) {
|
||||||
int numServersToTry = (int)Math.floor(urls.size() * this.permittedLoadBalancerRequestsMaximumFraction);
|
int numServersToTry = (int)Math.floor(urls.size() * this.permittedLoadBalancerRequestsMaximumFraction);
|
||||||
if (numServersToTry < this.permittedLoadBalancerRequestsMinimumAbsolute) {
|
if (numServersToTry < this.permittedLoadBalancerRequestsMinimumAbsolute) {
|
||||||
|
@ -428,13 +389,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new completion service for use by a single set of distributed requests.
|
|
||||||
*/
|
|
||||||
public CompletionService<ShardResponse> newCompletionService() {
|
|
||||||
return new ExecutorCompletionService<>(commExecutor);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rebuilds the URL replacing the URL scheme of the passed URL with the
|
* Rebuilds the URL replacing the URL scheme of the passed URL with the
|
||||||
* configured scheme replacement.If no scheme was configured, the passed URL's
|
* configured scheme replacement.If no scheme was configured, the passed URL's
|
||||||
|
|
|
@ -1,178 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.solr.handler.component;
|
|
||||||
|
|
||||||
import io.opentracing.Span;
|
|
||||||
import io.opentracing.Tracer;
|
|
||||||
import io.opentracing.propagation.Format;
|
|
||||||
import java.net.ConnectException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
|
||||||
import org.apache.solr.client.solrj.SolrResponse;
|
|
||||||
import org.apache.solr.client.solrj.impl.LBSolrClient;
|
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
|
||||||
import org.apache.solr.common.SolrException;
|
|
||||||
import org.apache.solr.common.params.CommonParams;
|
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
|
||||||
import org.apache.solr.common.util.NamedList;
|
|
||||||
import org.apache.solr.request.SolrRequestInfo;
|
|
||||||
import org.apache.solr.util.tracing.GlobalTracer;
|
|
||||||
import org.apache.solr.util.tracing.SolrRequestCarrier;
|
|
||||||
import org.slf4j.MDC;
|
|
||||||
|
|
||||||
class ShardRequestor implements Callable<ShardResponse> {
|
|
||||||
private final ShardRequest sreq;
|
|
||||||
private final String shard;
|
|
||||||
private final ModifiableSolrParams params;
|
|
||||||
private final Tracer tracer;
|
|
||||||
private final Span span;
|
|
||||||
private final List<String> urls;
|
|
||||||
private final HttpShardHandler httpShardHandler;
|
|
||||||
|
|
||||||
// maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
|
|
||||||
// This is primarily to keep track of what order we should use to query the replicas of a shard
|
|
||||||
// so that we use the same replica for all phases of a distributed request.
|
|
||||||
private Map<String, List<String>> shardToURLs = new HashMap<>();
|
|
||||||
|
|
||||||
public ShardRequestor(ShardRequest sreq, String shard, ModifiableSolrParams params, HttpShardHandler httpShardHandler) {
|
|
||||||
this.sreq = sreq;
|
|
||||||
this.shard = shard;
|
|
||||||
this.params = params;
|
|
||||||
this.httpShardHandler = httpShardHandler;
|
|
||||||
// do this before call() for thread safety reasons
|
|
||||||
this.urls = getURLs(shard);
|
|
||||||
tracer = GlobalTracer.getTracer();
|
|
||||||
span = tracer != null ? tracer.activeSpan() : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Not thread safe... don't use in Callable.
|
|
||||||
// Don't modify the returned URL list.
|
|
||||||
private List<String> getURLs(String shard) {
|
|
||||||
List<String> urls = shardToURLs.get(shard);
|
|
||||||
if (urls == null) {
|
|
||||||
urls = httpShardHandler.httpShardHandlerFactory.buildURLList(shard);
|
|
||||||
shardToURLs.put(shard, urls);
|
|
||||||
}
|
|
||||||
return urls;
|
|
||||||
}
|
|
||||||
|
|
||||||
void init() {
|
|
||||||
if (shard != null) {
|
|
||||||
MDC.put("ShardRequest.shards", shard);
|
|
||||||
}
|
|
||||||
if (urls != null && !urls.isEmpty()) {
|
|
||||||
MDC.put("ShardRequest.urlList", urls.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void end() {
|
|
||||||
MDC.remove("ShardRequest.shards");
|
|
||||||
MDC.remove("ShardRequest.urlList");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ShardResponse call() throws Exception {
|
|
||||||
|
|
||||||
ShardResponse srsp = new ShardResponse();
|
|
||||||
if (sreq.nodeName != null) {
|
|
||||||
srsp.setNodeName(sreq.nodeName);
|
|
||||||
}
|
|
||||||
srsp.setShardRequest(sreq);
|
|
||||||
srsp.setShard(shard);
|
|
||||||
SimpleSolrResponse ssr = new SimpleSolrResponse();
|
|
||||||
srsp.setSolrResponse(ssr);
|
|
||||||
long startTime = System.nanoTime();
|
|
||||||
|
|
||||||
try {
|
|
||||||
params.remove(CommonParams.WT); // use default (currently javabin)
|
|
||||||
params.remove(CommonParams.VERSION);
|
|
||||||
|
|
||||||
QueryRequest req = httpShardHandler.makeQueryRequest(sreq, params, shard);
|
|
||||||
if (tracer != null && span != null) {
|
|
||||||
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
|
|
||||||
}
|
|
||||||
req.setMethod(SolrRequest.METHOD.POST);
|
|
||||||
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
|
|
||||||
if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
|
|
||||||
|
|
||||||
// no need to set the response parser as binary is the defaultJab
|
|
||||||
// req.setResponseParser(new BinaryResponseParser());
|
|
||||||
|
|
||||||
// if there are no shards available for a slice, urls.size()==0
|
|
||||||
if (urls.size() == 0) {
|
|
||||||
// TODO: what's the right error code here? We should use the same thing when
|
|
||||||
// all of the servers for a shard are down.
|
|
||||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (urls.size() <= 1) {
|
|
||||||
String url = urls.get(0);
|
|
||||||
srsp.setShardAddress(url);
|
|
||||||
ssr.nl = httpShardHandler.request(url, req);
|
|
||||||
} else {
|
|
||||||
LBSolrClient.Rsp rsp = httpShardHandler.httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
|
|
||||||
ssr.nl = rsp.getResponse();
|
|
||||||
srsp.setShardAddress(rsp.getServer());
|
|
||||||
}
|
|
||||||
} catch (ConnectException cex) {
|
|
||||||
srsp.setException(cex); //????
|
|
||||||
} catch (Exception th) {
|
|
||||||
srsp.setException(th);
|
|
||||||
if (th instanceof SolrException) {
|
|
||||||
srsp.setResponseCode(((SolrException) th).code());
|
|
||||||
} else {
|
|
||||||
srsp.setResponseCode(-1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
|
|
||||||
|
|
||||||
return httpShardHandler.transfomResponse(sreq, srsp, shard);
|
|
||||||
}
|
|
||||||
|
|
||||||
static class SimpleSolrResponse extends SolrResponse {
|
|
||||||
|
|
||||||
long elapsedTime;
|
|
||||||
|
|
||||||
NamedList<Object> nl;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getElapsedTime() {
|
|
||||||
return elapsedTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public NamedList<Object> getResponse() {
|
|
||||||
return nl;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setResponse(NamedList<Object> rsp) {
|
|
||||||
nl = rsp;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setElapsedTime(long elapsedTime) {
|
|
||||||
this.elapsedTime = elapsedTime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -22,9 +22,9 @@ public final class ShardResponse {
|
||||||
private ShardRequest req;
|
private ShardRequest req;
|
||||||
private String shard;
|
private String shard;
|
||||||
private String nodeName;
|
private String nodeName;
|
||||||
private String shardAddress; // the specific shard that this response was received from
|
private volatile String shardAddress; // the specific shard that this response was received from
|
||||||
private int rspCode;
|
private int rspCode;
|
||||||
private Throwable exception;
|
private volatile Throwable exception;
|
||||||
private SolrResponse rsp;
|
private SolrResponse rsp;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -44,8 +44,8 @@ import org.apache.solr.common.util.IOUtils;
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.core.SolrInfoBean;
|
import org.apache.solr.core.SolrInfoBean;
|
||||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
|
||||||
import org.apache.solr.handler.component.ShardHandler;
|
import org.apache.solr.handler.component.ShardHandler;
|
||||||
|
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||||
import org.apache.solr.handler.component.ShardRequest;
|
import org.apache.solr.handler.component.ShardRequest;
|
||||||
import org.apache.solr.handler.component.ShardResponse;
|
import org.apache.solr.handler.component.ShardResponse;
|
||||||
import org.apache.solr.metrics.SolrMetricProducer;
|
import org.apache.solr.metrics.SolrMetricProducer;
|
||||||
|
@ -78,7 +78,7 @@ public class PeerSync implements SolrMetricProducer {
|
||||||
|
|
||||||
private UpdateHandler uhandler;
|
private UpdateHandler uhandler;
|
||||||
private UpdateLog ulog;
|
private UpdateLog ulog;
|
||||||
private HttpShardHandlerFactory shardHandlerFactory;
|
private ShardHandlerFactory shardHandlerFactory;
|
||||||
private ShardHandler shardHandler;
|
private ShardHandler shardHandler;
|
||||||
private List<SyncShardRequest> requests = new ArrayList<>();
|
private List<SyncShardRequest> requests = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -123,8 +123,8 @@ public class PeerSync implements SolrMetricProducer {
|
||||||
uhandler = core.getUpdateHandler();
|
uhandler = core.getUpdateHandler();
|
||||||
ulog = uhandler.getUpdateLog();
|
ulog = uhandler.getUpdateLog();
|
||||||
// TODO: close
|
// TODO: close
|
||||||
shardHandlerFactory = (HttpShardHandlerFactory) core.getCoreContainer().getShardHandlerFactory();
|
shardHandlerFactory = core.getCoreContainer().getShardHandlerFactory();
|
||||||
shardHandler = shardHandlerFactory.getShardHandler(client);
|
shardHandler = shardHandlerFactory.getShardHandler();
|
||||||
this.updater = new Updater(msg(), core);
|
this.updater = new Updater(msg(), core);
|
||||||
|
|
||||||
core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.REPLICATION.toString(), this);
|
core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.REPLICATION.toString(), this);
|
||||||
|
@ -418,7 +418,7 @@ public class PeerSync implements SolrMetricProducer {
|
||||||
sreq.params.set(DISTRIB, false);
|
sreq.params.set(DISTRIB, false);
|
||||||
sreq.params.set("checkCanHandleVersionRanges", false);
|
sreq.params.set("checkCanHandleVersionRanges", false);
|
||||||
|
|
||||||
ShardHandler sh = shardHandlerFactory.getShardHandler(client);
|
ShardHandler sh = shardHandlerFactory.getShardHandler();
|
||||||
sh.submit(sreq, replica, sreq.params);
|
sh.submit(sreq, replica, sreq.params);
|
||||||
|
|
||||||
ShardResponse srsp = sh.takeCompletedIncludingErrors();
|
ShardResponse srsp = sh.takeCompletedIncludingErrors();
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||||
import org.apache.solr.client.solrj.impl.Http2SolrClient;
|
|
||||||
import org.apache.solr.cloud.Overseer.LeaderStatus;
|
import org.apache.solr.cloud.Overseer.LeaderStatus;
|
||||||
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
|
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
|
||||||
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
|
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
|
||||||
|
@ -151,7 +150,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
||||||
Overseer overseer,
|
Overseer overseer,
|
||||||
DistributedMap completedMap,
|
DistributedMap completedMap,
|
||||||
DistributedMap failureMap) {
|
DistributedMap failureMap) {
|
||||||
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory, null), workQueue, runningMap, completedMap, failureMap);
|
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -253,8 +252,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
protected Set<String> commonMocks(int liveNodesCount) throws Exception {
|
protected Set<String> commonMocks(int liveNodesCount) throws Exception {
|
||||||
when(shardHandlerFactoryMock.getShardHandler()).thenReturn(shardHandlerMock);
|
when(shardHandlerFactoryMock.getShardHandler()).thenReturn(shardHandlerMock);
|
||||||
when(shardHandlerFactoryMock.getShardHandler(any(Http2SolrClient.class))).thenReturn(shardHandlerMock);
|
|
||||||
when(shardHandlerFactoryMock.getShardHandler(any(HttpClient.class))).thenReturn(shardHandlerMock);
|
|
||||||
when(workQueueMock.peekTopN(anyInt(), any(), anyLong())).thenAnswer(invocation -> {
|
when(workQueueMock.peekTopN(anyInt(), any(), anyLong())).thenAnswer(invocation -> {
|
||||||
Object result;
|
Object result;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
|
@ -136,6 +136,6 @@ public class TestSolrCloudWithHadoopAuthPlugin extends SolrCloudAuthTestCase {
|
||||||
deleteReq.process(solrClient);
|
deleteReq.process(solrClient);
|
||||||
AbstractDistribZkTestBase.waitForCollectionToDisappear(collectionName,
|
AbstractDistribZkTestBase.waitForCollectionToDisappear(collectionName,
|
||||||
solrClient.getZkStateReader(), true, 330);
|
solrClient.getZkStateReader(), true, 330);
|
||||||
assertAuthMetricsMinimums(14, 8, 0, 6, 0, 0);
|
// cookie was used to avoid re-authentication
|
||||||
}
|
assertAuthMetricsMinimums(13, 8, 0, 5, 0, 0); }
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.Http2SolrClient;
|
import org.apache.solr.client.solrj.impl.Http2SolrClient;
|
||||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
|
import org.apache.solr.client.solrj.util.Cancellable;
|
||||||
|
import org.apache.solr.client.solrj.util.AsyncListener;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
|
||||||
|
@ -120,21 +122,21 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
|
||||||
return super.request(request, collection);
|
return super.request(request, collection);
|
||||||
}
|
}
|
||||||
|
|
||||||
public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request,
|
@Override
|
||||||
String collection, OnComplete onComplete)
|
public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest request,
|
||||||
throws SolrServerException, IOException {
|
String collection, AsyncListener<NamedList<Object>> asyncListener) {
|
||||||
if (request instanceof UpdateRequest) {
|
if (request instanceof UpdateRequest) {
|
||||||
UpdateRequest ur = (UpdateRequest) request;
|
UpdateRequest ur = (UpdateRequest) request;
|
||||||
// won't throw exception if request is DBQ
|
// won't throw exception if request is DBQ
|
||||||
if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
|
if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
|
||||||
return super.request(request, collection, onComplete);
|
return super.asyncRequest(request, collection, asyncListener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (exp != null) {
|
if (exp != null) {
|
||||||
if (oneExpPerReq) {
|
if (oneExpPerReq) {
|
||||||
if (reqGotException.contains(request)) {
|
if (reqGotException.contains(request)) {
|
||||||
return super.request(request, collection, onComplete);
|
return super.asyncRequest(request, collection, asyncListener);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
reqGotException.add(request);
|
reqGotException.add(request);
|
||||||
|
@ -143,17 +145,12 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
|
||||||
Exception e = exception();
|
Exception e = exception();
|
||||||
if (e instanceof IOException) {
|
if (e instanceof IOException) {
|
||||||
if (LuceneTestCase.random().nextBoolean()) {
|
if (LuceneTestCase.random().nextBoolean()) {
|
||||||
throw (IOException) e;
|
e = new SolrServerException(e);
|
||||||
} else {
|
|
||||||
throw new SolrServerException(e);
|
|
||||||
}
|
}
|
||||||
} else if (e instanceof SolrServerException) {
|
|
||||||
throw (SolrServerException) e;
|
|
||||||
} else {
|
|
||||||
throw new SolrServerException(e);
|
|
||||||
}
|
}
|
||||||
|
asyncListener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return super.request(request, collection, onComplete);
|
return super.asyncRequest(request, collection, asyncListener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,6 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Phaser;
|
import java.util.concurrent.Phaser;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
@ -55,8 +54,10 @@ import org.apache.solr.client.solrj.embedded.SSLConfig;
|
||||||
import org.apache.solr.client.solrj.request.RequestWriter;
|
import org.apache.solr.client.solrj.request.RequestWriter;
|
||||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
import org.apache.solr.client.solrj.request.V2Request;
|
import org.apache.solr.client.solrj.request.V2Request;
|
||||||
|
import org.apache.solr.client.solrj.util.Cancellable;
|
||||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||||
import org.apache.solr.client.solrj.util.Constants;
|
import org.apache.solr.client.solrj.util.Constants;
|
||||||
|
import org.apache.solr.client.solrj.util.AsyncListener;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.StringUtils;
|
import org.apache.solr.common.StringUtils;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
@ -75,9 +76,7 @@ import org.eclipse.jetty.client.HttpClientTransport;
|
||||||
import org.eclipse.jetty.client.ProtocolHandlers;
|
import org.eclipse.jetty.client.ProtocolHandlers;
|
||||||
import org.eclipse.jetty.client.api.Request;
|
import org.eclipse.jetty.client.api.Request;
|
||||||
import org.eclipse.jetty.client.api.Response;
|
import org.eclipse.jetty.client.api.Response;
|
||||||
import org.eclipse.jetty.client.api.Result;
|
|
||||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
|
||||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||||
import org.eclipse.jetty.client.util.FormContentProvider;
|
import org.eclipse.jetty.client.util.FormContentProvider;
|
||||||
import org.eclipse.jetty.client.util.InputStreamContentProvider;
|
import org.eclipse.jetty.client.util.InputStreamContentProvider;
|
||||||
|
@ -97,8 +96,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException;
|
|
||||||
import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
|
import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
|
||||||
|
import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException;
|
||||||
import static org.apache.solr.common.util.Utils.getObjectByPath;
|
import static org.apache.solr.common.util.Utils.getObjectByPath;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -135,6 +134,8 @@ public class Http2SolrClient extends SolrClient {
|
||||||
*/
|
*/
|
||||||
private String serverBaseUrl;
|
private String serverBaseUrl;
|
||||||
private boolean closeClient;
|
private boolean closeClient;
|
||||||
|
private ExecutorService executor;
|
||||||
|
private boolean shutdownExecutor;
|
||||||
|
|
||||||
protected Http2SolrClient(String serverBaseUrl, Builder builder) {
|
protected Http2SolrClient(String serverBaseUrl, Builder builder) {
|
||||||
if (serverBaseUrl != null) {
|
if (serverBaseUrl != null) {
|
||||||
|
@ -178,8 +179,14 @@ public class Http2SolrClient extends SolrClient {
|
||||||
HttpClient httpClient;
|
HttpClient httpClient;
|
||||||
|
|
||||||
BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
|
BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
|
||||||
ThreadPoolExecutor httpClientExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
|
executor = builder.executor;
|
||||||
256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
|
if (executor == null) {
|
||||||
|
this.executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
|
||||||
|
256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
|
||||||
|
shutdownExecutor = true;
|
||||||
|
} else {
|
||||||
|
shutdownExecutor = false;
|
||||||
|
}
|
||||||
|
|
||||||
SslContextFactory.Client sslContextFactory;
|
SslContextFactory.Client sslContextFactory;
|
||||||
boolean ssl;
|
boolean ssl;
|
||||||
|
@ -210,7 +217,7 @@ public class Http2SolrClient extends SolrClient {
|
||||||
httpClient.setMaxConnectionsPerDestination(4);
|
httpClient.setMaxConnectionsPerDestination(4);
|
||||||
}
|
}
|
||||||
|
|
||||||
httpClient.setExecutor(httpClientExecutor);
|
httpClient.setExecutor(this.executor);
|
||||||
httpClient.setStrictEventOrdering(false);
|
httpClient.setStrictEventOrdering(false);
|
||||||
httpClient.setConnectBlocking(true);
|
httpClient.setConnectBlocking(true);
|
||||||
httpClient.setFollowRedirects(false);
|
httpClient.setFollowRedirects(false);
|
||||||
|
@ -232,14 +239,15 @@ public class Http2SolrClient extends SolrClient {
|
||||||
asyncTracker.waitForComplete();
|
asyncTracker.waitForComplete();
|
||||||
if (closeClient) {
|
if (closeClient) {
|
||||||
try {
|
try {
|
||||||
ExecutorService executor = (ExecutorService) httpClient.getExecutor();
|
|
||||||
httpClient.setStopTimeout(1000);
|
httpClient.setStopTimeout(1000);
|
||||||
httpClient.stop();
|
httpClient.stop();
|
||||||
ExecutorUtil.shutdownAndAwaitTermination(executor);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException("Exception on closing client", e);
|
throw new RuntimeException("Exception on closing client", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (shutdownExecutor) {
|
||||||
|
ExecutorUtil.shutdownAndAwaitTermination(executor);
|
||||||
|
}
|
||||||
|
|
||||||
assert ObjectReleaseTracker.release(this);
|
assert ObjectReleaseTracker.release(this);
|
||||||
}
|
}
|
||||||
|
@ -361,76 +369,100 @@ public class Http2SolrClient extends SolrClient {
|
||||||
outStream.flush();
|
outStream.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest,
|
private static final Exception CANCELLED_EXCEPTION = new Exception();
|
||||||
String collection,
|
private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};
|
||||||
OnComplete onComplete) throws IOException, SolrServerException {
|
|
||||||
|
public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection, AsyncListener<NamedList<Object>> asyncListener) {
|
||||||
|
Request req;
|
||||||
|
try {
|
||||||
|
req = makeRequest(solrRequest, collection);
|
||||||
|
} catch (SolrServerException | IOException e) {
|
||||||
|
asyncListener.onFailure(e);
|
||||||
|
return FAILED_MAKING_REQUEST_CANCELLABLE;
|
||||||
|
}
|
||||||
|
final ResponseParser parser = solrRequest.getResponseParser() == null
|
||||||
|
? this.parser: solrRequest.getResponseParser();
|
||||||
|
req.onRequestQueued(asyncTracker.queuedListener)
|
||||||
|
.onComplete(asyncTracker.completeListener)
|
||||||
|
.send(new InputStreamResponseListener() {
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Response response) {
|
||||||
|
super.onHeaders(response);
|
||||||
|
InputStreamResponseListener listener = this;
|
||||||
|
executor.execute(() -> {
|
||||||
|
InputStream is = listener.getInputStream();
|
||||||
|
assert ObjectReleaseTracker.track(is);
|
||||||
|
try {
|
||||||
|
NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
|
||||||
|
asyncListener.onSuccess(body);
|
||||||
|
} catch (RemoteSolrException e) {
|
||||||
|
if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
|
||||||
|
asyncListener.onFailure(e);
|
||||||
|
}
|
||||||
|
} catch (SolrServerException e) {
|
||||||
|
asyncListener.onFailure(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Response response, Throwable failure) {
|
||||||
|
super.onFailure(response, failure);
|
||||||
|
if (failure != CANCELLED_EXCEPTION) {
|
||||||
|
asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return () -> req.abort(CANCELLED_EXCEPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
|
||||||
Request req = makeRequest(solrRequest, collection);
|
Request req = makeRequest(solrRequest, collection);
|
||||||
final ResponseParser parser = solrRequest.getResponseParser() == null
|
final ResponseParser parser = solrRequest.getResponseParser() == null
|
||||||
? this.parser: solrRequest.getResponseParser();
|
? this.parser: solrRequest.getResponseParser();
|
||||||
|
|
||||||
if (onComplete != null) {
|
try {
|
||||||
// This async call only suitable for indexing since the response size is limited by 5MB
|
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||||
req.onRequestQueued(asyncTracker.queuedListener)
|
req.send(listener);
|
||||||
.onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
|
Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
|
||||||
|
InputStream is = listener.getInputStream();
|
||||||
|
assert ObjectReleaseTracker.track(is);
|
||||||
|
|
||||||
@Override
|
return processErrorsAndResponse(solrRequest, parser, response, is);
|
||||||
public void onComplete(Result result) {
|
} catch (InterruptedException e) {
|
||||||
if (result.isFailed()) {
|
Thread.currentThread().interrupt();
|
||||||
onComplete.onFailure(result.getFailure());
|
throw new RuntimeException(e);
|
||||||
return;
|
} catch (TimeoutException e) {
|
||||||
}
|
throw new SolrServerException(
|
||||||
|
"Timeout occured while waiting response from server at: " + req.getURI(), e);
|
||||||
NamedList<Object> rsp;
|
} catch (ExecutionException e) {
|
||||||
try {
|
Throwable cause = e.getCause();
|
||||||
InputStream is = getContentAsInputStream();
|
if (cause instanceof ConnectException) {
|
||||||
assert ObjectReleaseTracker.track(is);
|
throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
|
||||||
rsp = processErrorsAndResponse(result.getResponse(),
|
|
||||||
parser, is, getMediaType(), getEncoding(), isV2ApiRequest(solrRequest));
|
|
||||||
onComplete.onSuccess(rsp);
|
|
||||||
} catch (Exception e) {
|
|
||||||
onComplete.onFailure(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
|
||||||
req.send(listener);
|
|
||||||
Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
|
|
||||||
InputStream is = listener.getInputStream();
|
|
||||||
assert ObjectReleaseTracker.track(is);
|
|
||||||
|
|
||||||
ContentType contentType = getContentType(response);
|
|
||||||
String mimeType = null;
|
|
||||||
String encoding = null;
|
|
||||||
if (contentType != null) {
|
|
||||||
mimeType = contentType.getMimeType();
|
|
||||||
encoding = contentType.getCharset() != null? contentType.getCharset().name() : null;
|
|
||||||
}
|
|
||||||
return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest));
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
throw new SolrServerException(
|
|
||||||
"Timeout occured while waiting response from server at: " + req.getURI(), e);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
Throwable cause = e.getCause();
|
|
||||||
if (cause instanceof ConnectException) {
|
|
||||||
throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
|
|
||||||
}
|
|
||||||
if (cause instanceof SolrServerException) {
|
|
||||||
throw (SolrServerException) cause;
|
|
||||||
} else if (cause instanceof IOException) {
|
|
||||||
throw new SolrServerException(
|
|
||||||
"IOException occured when talking to server at: " + getBaseURL(), cause);
|
|
||||||
}
|
|
||||||
throw new SolrServerException(cause.getMessage(), cause);
|
|
||||||
}
|
}
|
||||||
|
if (cause instanceof SolrServerException) {
|
||||||
|
throw (SolrServerException) cause;
|
||||||
|
} else if (cause instanceof IOException) {
|
||||||
|
throw new SolrServerException(
|
||||||
|
"IOException occured when talking to server at: " + getBaseURL(), cause);
|
||||||
|
}
|
||||||
|
throw new SolrServerException(cause.getMessage(), cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private NamedList<Object> processErrorsAndResponse(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest,
|
||||||
|
ResponseParser parser, Response response, InputStream is) throws SolrServerException {
|
||||||
|
ContentType contentType = getContentType(response);
|
||||||
|
String mimeType = null;
|
||||||
|
String encoding = null;
|
||||||
|
if (contentType != null) {
|
||||||
|
mimeType = contentType.getMimeType();
|
||||||
|
encoding = contentType.getCharset() != null? contentType.getCharset().name() : null;
|
||||||
|
}
|
||||||
|
return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest));
|
||||||
|
}
|
||||||
|
|
||||||
private ContentType getContentType(Response response) {
|
private ContentType getContentType(Response response) {
|
||||||
String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE);
|
String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE);
|
||||||
return StringUtils.isEmpty(contentType)? null : ContentType.parse(contentType);
|
return StringUtils.isEmpty(contentType)? null : ContentType.parse(contentType);
|
||||||
|
@ -453,6 +485,7 @@ public class Http2SolrClient extends SolrClient {
|
||||||
|
|
||||||
private void decorateRequest(Request req, @SuppressWarnings({"rawtypes"})SolrRequest solrRequest) {
|
private void decorateRequest(Request req, @SuppressWarnings({"rawtypes"})SolrRequest solrRequest) {
|
||||||
req.header(HttpHeader.ACCEPT_ENCODING, null);
|
req.header(HttpHeader.ACCEPT_ENCODING, null);
|
||||||
|
req.timeout(idleTimeout, TimeUnit.MILLISECONDS);
|
||||||
if (solrRequest.getUserPrincipal() != null) {
|
if (solrRequest.getUserPrincipal() != null) {
|
||||||
req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
|
req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
|
||||||
}
|
}
|
||||||
|
@ -750,21 +783,10 @@ public class Http2SolrClient extends SolrClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request, String collection) throws SolrServerException, IOException {
|
|
||||||
return request(request, collection, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setRequestWriter(RequestWriter requestWriter) {
|
public void setRequestWriter(RequestWriter requestWriter) {
|
||||||
this.requestWriter = requestWriter;
|
this.requestWriter = requestWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface OnComplete {
|
|
||||||
void onSuccess(NamedList<Object> result);
|
|
||||||
|
|
||||||
void onFailure(Throwable e);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setFollowRedirects(boolean follow) {
|
public void setFollowRedirects(boolean follow) {
|
||||||
httpClient.setFollowRedirects(follow);
|
httpClient.setFollowRedirects(follow);
|
||||||
}
|
}
|
||||||
|
@ -821,6 +843,7 @@ public class Http2SolrClient extends SolrClient {
|
||||||
private Integer maxConnectionsPerHost;
|
private Integer maxConnectionsPerHost;
|
||||||
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
|
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
|
||||||
protected String baseSolrUrl;
|
protected String baseSolrUrl;
|
||||||
|
private ExecutorService executor;
|
||||||
|
|
||||||
public Builder() {
|
public Builder() {
|
||||||
|
|
||||||
|
@ -842,6 +865,11 @@ public class Http2SolrClient extends SolrClient {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withExecutor(ExecutorService executor) {
|
||||||
|
this.executor = executor;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withSSLConfig(SSLConfig sslConfig) {
|
public Builder withSSLConfig(SSLConfig sslConfig) {
|
||||||
this.sslConfig = sslConfig;
|
this.sslConfig = sslConfig;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -16,9 +16,24 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.client.solrj.impl;
|
package org.apache.solr.client.solrj.impl;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
import org.apache.solr.client.solrj.request.IsUpdateRequest;
|
||||||
|
import org.apache.solr.client.solrj.util.Cancellable;
|
||||||
|
import org.apache.solr.client.solrj.util.AsyncListener;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.slf4j.MDC;
|
||||||
|
|
||||||
|
import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around
|
* LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around
|
||||||
|
@ -66,4 +81,126 @@ public class LBHttp2SolrClient extends LBSolrClient {
|
||||||
protected SolrClient getClient(String baseUrl) {
|
protected SolrClient getClient(String baseUrl) {
|
||||||
return httpClient;
|
return httpClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Cancellable asyncReq(Req req, AsyncListener<Rsp> asyncListener) {
|
||||||
|
Rsp rsp = new Rsp();
|
||||||
|
boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
|
||||||
|
ServerIterator it = new ServerIterator(req, zombieServers);
|
||||||
|
asyncListener.onStart();
|
||||||
|
final AtomicBoolean cancelled = new AtomicBoolean(false);
|
||||||
|
AtomicReference<Cancellable> currentCancellable = new AtomicReference<>();
|
||||||
|
RetryListener retryListener = new RetryListener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Rsp rsp) {
|
||||||
|
asyncListener.onSuccess(rsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e, boolean retryReq) {
|
||||||
|
if (retryReq) {
|
||||||
|
String url;
|
||||||
|
try {
|
||||||
|
url = it.nextOrError(e);
|
||||||
|
} catch (SolrServerException ex) {
|
||||||
|
asyncListener.onFailure(e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
MDC.put("LBSolrClient.url", url);
|
||||||
|
synchronized (cancelled) {
|
||||||
|
if (cancelled.get()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Cancellable cancellable = doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
|
||||||
|
currentCancellable.set(cancellable);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
MDC.remove("LBSolrClient.url");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
asyncListener.onFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener);
|
||||||
|
currentCancellable.set(cancellable);
|
||||||
|
} catch (SolrServerException e) {
|
||||||
|
asyncListener.onFailure(e);
|
||||||
|
}
|
||||||
|
return () -> {
|
||||||
|
synchronized (cancelled) {
|
||||||
|
cancelled.set(true);
|
||||||
|
if (currentCancellable.get() != null) {
|
||||||
|
currentCancellable.get().cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface RetryListener {
|
||||||
|
void onSuccess(Rsp rsp);
|
||||||
|
void onFailure(Exception e, boolean retryReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Cancellable doRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable,
|
||||||
|
boolean isZombie, RetryListener listener) {
|
||||||
|
rsp.server = baseUrl;
|
||||||
|
req.getRequest().setBasePath(baseUrl);
|
||||||
|
return ((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new AsyncListener<>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(NamedList<Object> result) {
|
||||||
|
rsp.rsp = result;
|
||||||
|
if (isZombie) {
|
||||||
|
zombieServers.remove(baseUrl);
|
||||||
|
}
|
||||||
|
listener.onSuccess(rsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable oe) {
|
||||||
|
try {
|
||||||
|
throw (Exception) oe;
|
||||||
|
} catch (BaseHttpSolrClient.RemoteExecutionException e) {
|
||||||
|
listener.onFailure(e, false);
|
||||||
|
} catch (SolrException e) {
|
||||||
|
// we retry on 404 or 403 or 503 or 500
|
||||||
|
// unless it's an update - then we only retry on connect exception
|
||||||
|
if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
|
||||||
|
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
|
||||||
|
} else {
|
||||||
|
// Server is alive but the request was likely malformed or invalid
|
||||||
|
if (isZombie) {
|
||||||
|
zombieServers.remove(baseUrl);
|
||||||
|
}
|
||||||
|
listener.onFailure(e, false);
|
||||||
|
}
|
||||||
|
} catch (SocketException e) {
|
||||||
|
if (!isNonRetryable || e instanceof ConnectException) {
|
||||||
|
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
|
||||||
|
} else {
|
||||||
|
listener.onFailure(e, false);
|
||||||
|
}
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
if (!isNonRetryable) {
|
||||||
|
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
|
||||||
|
} else {
|
||||||
|
listener.onFailure(e, false);
|
||||||
|
}
|
||||||
|
} catch (SolrServerException e) {
|
||||||
|
Throwable rootCause = e.getRootCause();
|
||||||
|
if (!isNonRetryable && rootCause instanceof IOException) {
|
||||||
|
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
|
||||||
|
} else if (isNonRetryable && rootCause instanceof ConnectException) {
|
||||||
|
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
|
||||||
|
} else {
|
||||||
|
listener.onFailure(e, false);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
listener.onFailure(new SolrServerException(e), false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -60,7 +61,7 @@ import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
|
||||||
public abstract class LBSolrClient extends SolrClient {
|
public abstract class LBSolrClient extends SolrClient {
|
||||||
|
|
||||||
// defaults
|
// defaults
|
||||||
private static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
|
protected static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
|
||||||
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
|
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
|
||||||
private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
|
private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
|
||||||
|
|
||||||
|
@ -69,7 +70,7 @@ public abstract class LBSolrClient extends SolrClient {
|
||||||
private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
|
private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
|
||||||
// access to aliveServers should be synchronized on itself
|
// access to aliveServers should be synchronized on itself
|
||||||
|
|
||||||
private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
|
protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// changes to aliveServers are reflected in this array, no need to synchronize
|
// changes to aliveServers are reflected in this array, no need to synchronize
|
||||||
private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
|
private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
|
||||||
|
@ -136,6 +137,99 @@ public abstract class LBSolrClient extends SolrClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static class ServerIterator {
|
||||||
|
String serverStr;
|
||||||
|
List<String> skipped;
|
||||||
|
int numServersTried;
|
||||||
|
Iterator<String> it;
|
||||||
|
Iterator<String> skippedIt;
|
||||||
|
String exceptionMessage;
|
||||||
|
long timeAllowedNano;
|
||||||
|
long timeOutTime;
|
||||||
|
|
||||||
|
final Map<String, ServerWrapper> zombieServers;
|
||||||
|
final Req req;
|
||||||
|
|
||||||
|
public ServerIterator(Req req, Map<String, ServerWrapper> zombieServers) {
|
||||||
|
this.it = req.getServers().iterator();
|
||||||
|
this.req = req;
|
||||||
|
this.zombieServers = zombieServers;
|
||||||
|
this.timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
|
||||||
|
this.timeOutTime = System.nanoTime() + timeAllowedNano;
|
||||||
|
fetchNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized boolean hasNext() {
|
||||||
|
return serverStr != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchNext() {
|
||||||
|
serverStr = null;
|
||||||
|
if (req.numServersToTry != null && numServersTried > req.numServersToTry) {
|
||||||
|
exceptionMessage = "Time allowed to handle this request exceeded";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (it.hasNext()) {
|
||||||
|
serverStr = it.next();
|
||||||
|
serverStr = normalize(serverStr);
|
||||||
|
// if the server is currently a zombie, just skip to the next one
|
||||||
|
ServerWrapper wrapper = zombieServers.get(serverStr);
|
||||||
|
if (wrapper != null) {
|
||||||
|
final int numDeadServersToTry = req.getNumDeadServersToTry();
|
||||||
|
if (numDeadServersToTry > 0) {
|
||||||
|
if (skipped == null) {
|
||||||
|
skipped = new ArrayList<>(numDeadServersToTry);
|
||||||
|
skipped.add(wrapper.getBaseUrl());
|
||||||
|
} else if (skipped.size() < numDeadServersToTry) {
|
||||||
|
skipped.add(wrapper.getBaseUrl());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (serverStr == null && skipped != null) {
|
||||||
|
if (skippedIt == null) {
|
||||||
|
skippedIt = skipped.iterator();
|
||||||
|
}
|
||||||
|
if (skippedIt.hasNext()) {
|
||||||
|
serverStr = skippedIt.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isServingZombieServer() {
|
||||||
|
return skippedIt != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized String nextOrError() throws SolrServerException {
|
||||||
|
return nextOrError(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized String nextOrError(Exception previousEx) throws SolrServerException {
|
||||||
|
String suffix = "";
|
||||||
|
if (previousEx == null) {
|
||||||
|
suffix = ":" + zombieServers.keySet();
|
||||||
|
}
|
||||||
|
if (isTimeExceeded(timeAllowedNano, timeOutTime)) {
|
||||||
|
throw new SolrServerException("Time allowed to handle this request exceeded"+suffix, previousEx);
|
||||||
|
}
|
||||||
|
if (serverStr == null) {
|
||||||
|
throw new SolrServerException("No live SolrServers available to handle this request"+suffix, previousEx);
|
||||||
|
}
|
||||||
|
numServersTried++;
|
||||||
|
if (req.getNumServersToTry() != null && numServersTried > req.getNumServersToTry()) {
|
||||||
|
throw new SolrServerException("No live SolrServers available to handle this request:"
|
||||||
|
+ " numServersTried="+numServersTried
|
||||||
|
+ " numServersToTry="+req.getNumServersToTry()+suffix, previousEx);
|
||||||
|
}
|
||||||
|
String rs = serverStr;
|
||||||
|
fetchNext();
|
||||||
|
return rs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class Req {
|
public static class Req {
|
||||||
@SuppressWarnings({"rawtypes"})
|
@SuppressWarnings({"rawtypes"})
|
||||||
|
@ -257,45 +351,12 @@ public abstract class LBSolrClient extends SolrClient {
|
||||||
Rsp rsp = new Rsp();
|
Rsp rsp = new Rsp();
|
||||||
Exception ex = null;
|
Exception ex = null;
|
||||||
boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
|
boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
|
||||||
List<ServerWrapper> skipped = null;
|
ServerIterator serverIterator = new ServerIterator(req, zombieServers);
|
||||||
|
String serverStr;
|
||||||
final Integer numServersToTry = req.getNumServersToTry();
|
while ((serverStr = serverIterator.nextOrError(ex)) != null) {
|
||||||
int numServersTried = 0;
|
|
||||||
|
|
||||||
boolean timeAllowedExceeded = false;
|
|
||||||
long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
|
|
||||||
long timeOutTime = System.nanoTime() + timeAllowedNano;
|
|
||||||
for (String serverStr : req.getServers()) {
|
|
||||||
if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
serverStr = normalize(serverStr);
|
|
||||||
// if the server is currently a zombie, just skip to the next one
|
|
||||||
ServerWrapper wrapper = zombieServers.get(serverStr);
|
|
||||||
if (wrapper != null) {
|
|
||||||
// System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
|
|
||||||
final int numDeadServersToTry = req.getNumDeadServersToTry();
|
|
||||||
if (numDeadServersToTry > 0) {
|
|
||||||
if (skipped == null) {
|
|
||||||
skipped = new ArrayList<>(numDeadServersToTry);
|
|
||||||
skipped.add(wrapper);
|
|
||||||
}
|
|
||||||
else if (skipped.size() < numDeadServersToTry) {
|
|
||||||
skipped.add(wrapper);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
MDC.put("LBSolrClient.url", serverStr);
|
MDC.put("LBSolrClient.url", serverStr);
|
||||||
|
ex = doRequest(serverStr, req, rsp, isNonRetryable, serverIterator.isServingZombieServer());
|
||||||
if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
++numServersTried;
|
|
||||||
ex = doRequest(serverStr, req, rsp, isNonRetryable, false);
|
|
||||||
if (ex == null) {
|
if (ex == null) {
|
||||||
return rsp; // SUCCESS
|
return rsp; // SUCCESS
|
||||||
}
|
}
|
||||||
|
@ -303,61 +364,19 @@ public abstract class LBSolrClient extends SolrClient {
|
||||||
MDC.remove("LBSolrClient.url");
|
MDC.remove("LBSolrClient.url");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
|
||||||
// try the servers we previously skipped
|
|
||||||
if (skipped != null) {
|
|
||||||
for (ServerWrapper wrapper : skipped) {
|
|
||||||
if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
MDC.put("LBSolrClient.url", wrapper.getBaseUrl());
|
|
||||||
++numServersTried;
|
|
||||||
ex = doRequest(wrapper.baseUrl, req, rsp, isNonRetryable, true);
|
|
||||||
if (ex == null) {
|
|
||||||
return rsp; // SUCCESS
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
MDC.remove("LBSolrClient.url");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
final String solrServerExceptionMessage;
|
|
||||||
if (timeAllowedExceeded) {
|
|
||||||
solrServerExceptionMessage = "Time allowed to handle this request exceeded";
|
|
||||||
} else {
|
|
||||||
if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
|
|
||||||
solrServerExceptionMessage = "No live SolrServers available to handle this request:"
|
|
||||||
+ " numServersTried="+numServersTried
|
|
||||||
+ " numServersToTry="+numServersToTry.intValue();
|
|
||||||
} else {
|
|
||||||
solrServerExceptionMessage = "No live SolrServers available to handle this request";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (ex == null) {
|
|
||||||
throw new SolrServerException(solrServerExceptionMessage);
|
|
||||||
} else {
|
|
||||||
throw new SolrServerException(solrServerExceptionMessage+":" + zombieServers.keySet(), ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return time allowed in nanos, returns -1 if no time_allowed is specified.
|
* @return time allowed in nanos, returns -1 if no time_allowed is specified.
|
||||||
*/
|
*/
|
||||||
private long getTimeAllowedInNanos(@SuppressWarnings({"rawtypes"})final SolrRequest req) {
|
private static long getTimeAllowedInNanos(@SuppressWarnings({"rawtypes"})final SolrRequest req) {
|
||||||
SolrParams reqParams = req.getParams();
|
SolrParams reqParams = req.getParams();
|
||||||
return reqParams == null ? -1 :
|
return reqParams == null ? -1 :
|
||||||
TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
|
TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
|
private static boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
|
||||||
return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
|
return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,7 +434,7 @@ public abstract class LBSolrClient extends SolrClient {
|
||||||
|
|
||||||
protected abstract SolrClient getClient(String baseUrl);
|
protected abstract SolrClient getClient(String baseUrl);
|
||||||
|
|
||||||
private Exception addZombie(String serverStr, Exception e) {
|
protected Exception addZombie(String serverStr, Exception e) {
|
||||||
ServerWrapper wrapper = createServerWrapper(serverStr);
|
ServerWrapper wrapper = createServerWrapper(serverStr);
|
||||||
wrapper.standard = false;
|
wrapper.standard = false;
|
||||||
zombieServers.put(serverStr, wrapper);
|
zombieServers.put(serverStr, wrapper);
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.client.solrj.util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listener for async requests
|
||||||
|
*/
|
||||||
|
public interface AsyncListener<T> {
|
||||||
|
/**
|
||||||
|
* Callback method invoked before processing the request
|
||||||
|
*/
|
||||||
|
default void onStart() {
|
||||||
|
|
||||||
|
}
|
||||||
|
void onSuccess(T t);
|
||||||
|
void onFailure(Throwable throwable);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.client.solrj.util;
|
||||||
|
|
||||||
|
public interface Cancellable {
|
||||||
|
void cancel();
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.client.solrj.impl;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class LBSolrClientTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerIterator() throws SolrServerException {
|
||||||
|
LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
|
||||||
|
LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
|
||||||
|
List<String> actualServers = new ArrayList<>();
|
||||||
|
while (serverIterator.hasNext()) {
|
||||||
|
actualServers.add(serverIterator.nextOrError());
|
||||||
|
}
|
||||||
|
assertEquals(Arrays.asList("1", "2", "3", "4"), actualServers);
|
||||||
|
assertFalse(serverIterator.hasNext());
|
||||||
|
LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerIteratorWithZombieServers() throws SolrServerException {
|
||||||
|
HashMap<String, LBSolrClient.ServerWrapper> zombieServers = new HashMap<>();
|
||||||
|
LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
|
||||||
|
LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, zombieServers);
|
||||||
|
zombieServers.put("2", new LBSolrClient.ServerWrapper("2"));
|
||||||
|
|
||||||
|
assertTrue(serverIterator.hasNext());
|
||||||
|
assertEquals("1", serverIterator.nextOrError());
|
||||||
|
assertTrue(serverIterator.hasNext());
|
||||||
|
assertEquals("3", serverIterator.nextOrError());
|
||||||
|
assertTrue(serverIterator.hasNext());
|
||||||
|
assertEquals("4", serverIterator.nextOrError());
|
||||||
|
assertTrue(serverIterator.hasNext());
|
||||||
|
assertEquals("2", serverIterator.nextOrError());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerIteratorTimeAllowed() throws SolrServerException, InterruptedException {
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
params.set(CommonParams.TIME_ALLOWED, 300);
|
||||||
|
LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(params), Arrays.asList("1", "2", "3", "4"), 2);
|
||||||
|
LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
|
||||||
|
assertTrue(serverIterator.hasNext());
|
||||||
|
serverIterator.nextOrError();
|
||||||
|
Thread.sleep(300);
|
||||||
|
LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerIteratorMaxRetry() throws SolrServerException {
|
||||||
|
LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"), 2);
|
||||||
|
LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
|
||||||
|
assertTrue(serverIterator.hasNext());
|
||||||
|
serverIterator.nextOrError();
|
||||||
|
assertTrue(serverIterator.hasNext());
|
||||||
|
serverIterator.nextOrError();
|
||||||
|
LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.handler.component;
|
package org.apache.solr.handler.component;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -25,13 +24,7 @@ import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.http.client.HttpClient;
|
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.Http2SolrClient;
|
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
|
||||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
@ -39,7 +32,6 @@ import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
|
|
||||||
|
@ -90,14 +82,9 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ShardHandler getShardHandler() {
|
public ShardHandler getShardHandler() {
|
||||||
return super.getShardHandler();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ShardHandler getShardHandler(Http2SolrClient client) {
|
|
||||||
final ShardHandlerFactory factory = this;
|
final ShardHandlerFactory factory = this;
|
||||||
final ShardHandler wrapped = super.getShardHandler(client);
|
final ShardHandler wrapped = super.getShardHandler();
|
||||||
return new HttpShardHandler(this, client) {
|
return new HttpShardHandler(this) {
|
||||||
@Override
|
@Override
|
||||||
public void prepDistributed(ResponseBuilder rb) {
|
public void prepDistributed(ResponseBuilder rb) {
|
||||||
wrapped.prepDistributed(rb);
|
wrapped.prepDistributed(rb);
|
||||||
|
@ -135,55 +122,6 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ShardHandler getShardHandler(HttpClient httpClient) {
|
|
||||||
final ShardHandlerFactory factory = this;
|
|
||||||
final ShardHandler wrapped = super.getShardHandler(httpClient);
|
|
||||||
return new HttpShardHandler(this, null) {
|
|
||||||
@Override
|
|
||||||
public void prepDistributed(ResponseBuilder rb) {
|
|
||||||
wrapped.prepDistributed(rb);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
|
|
||||||
synchronized (TrackingShardHandlerFactory.this) {
|
|
||||||
if (isTracking()) {
|
|
||||||
queue.offer(new ShardRequestAndParams(sreq, shard, params));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wrapped.submit(sreq, shard, params);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected NamedList<Object> request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException {
|
|
||||||
try (SolrClient client = new HttpSolrClient.Builder(url).withHttpClient(httpClient).build()) {
|
|
||||||
return client.request(req);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ShardResponse takeCompletedIncludingErrors() {
|
|
||||||
return wrapped.takeCompletedIncludingErrors();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ShardResponse takeCompletedOrError() {
|
|
||||||
return wrapped.takeCompletedOrError();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cancelAll() {
|
|
||||||
wrapped.cancelAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ShardHandlerFactory getShardHandlerFactory() {
|
|
||||||
return factory;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
super.close();
|
super.close();
|
||||||
|
|
Loading…
Reference in New Issue