From a80eb84d5659a06a10860ad2470e87d80b19fa5d Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Tue, 7 Jul 2020 09:17:26 +0700 Subject: [PATCH] SOLR-14354: HttpShardHandler send requests in async (#1470) --- solr/CHANGES.txt | 3 +- .../java/org/apache/solr/cloud/Overseer.java | 2 +- .../solr/cloud/OverseerNodePrioritizer.java | 9 +- .../org/apache/solr/cloud/SyncStrategy.java | 3 +- .../solr/cloud/api/collections/BackupCmd.java | 2 +- .../api/collections/CreateCollectionCmd.java | 2 +- .../api/collections/CreateSnapshotCmd.java | 2 +- .../api/collections/DeleteReplicaCmd.java | 2 +- .../api/collections/DeleteSnapshotCmd.java | 2 +- .../cloud/api/collections/MigrateCmd.java | 3 +- .../OverseerCollectionMessageHandler.java | 4 +- .../cloud/api/collections/RestoreCmd.java | 2 +- .../cloud/api/collections/SplitShardCmd.java | 2 +- .../handler/component/HttpShardHandler.java | 188 +++++++++++++---- .../component/HttpShardHandlerFactory.java | 62 +----- .../handler/component/ShardRequestor.java | 178 ---------------- .../solr/handler/component/ShardResponse.java | 4 +- .../java/org/apache/solr/update/PeerSync.java | 10 +- ...rseerCollectionConfigSetProcessorTest.java | 5 +- .../TestSolrCloudWithHadoopAuthPlugin.java | 4 +- .../solr/update/MockingHttp2SolrClient.java | 23 +-- .../client/solrj/impl/Http2SolrClient.java | 190 ++++++++++-------- .../client/solrj/impl/LBHttp2SolrClient.java | 137 +++++++++++++ .../solr/client/solrj/impl/LBSolrClient.java | 189 +++++++++-------- .../solr/client/solrj/util/AsyncListener.java | 33 +++ .../solr/client/solrj/util/Cancellable.java | 22 ++ .../client/solrj/impl/LBSolrClientTest.java | 90 +++++++++ .../TrackingShardHandlerFactory.java | 66 +----- 28 files changed, 689 insertions(+), 550 deletions(-) delete mode 100644 solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java create mode 100644 solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index e0d2fe6f4c6..835900a6199 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -107,7 +107,8 @@ Improvements Optimizations --------------------- -(No changes) + +* SOLR-14354: HttpShardHandler send requests in async (Cao Manh Dat). Bug Fixes --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 1c18acc0803..3dfe6c593e9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -639,7 +639,7 @@ public class Overseer implements SolrCloseable { ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process."); - OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getDefaultHttpClient()); + OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory()); overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer); ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id); ccThread.setDaemon(true); diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java index 20e650ae8d8..d532f03d18a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java @@ -20,7 +20,6 @@ import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; -import org.apache.http.client.HttpClient; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; @@ -29,7 +28,6 @@ import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.Utils; -import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardRequest; @@ -53,14 +51,11 @@ public class OverseerNodePrioritizer { private ZkDistributedQueue stateUpdateQueue; - private HttpClient httpClient; - - public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory, HttpClient httpClient) { + public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory) { this.zkStateReader = zkStateReader; this.adminPath = adminPath; this.shardHandlerFactory = shardHandlerFactory; this.stateUpdateQueue = stateUpdateQueue; - this.httpClient = httpClient; } public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception { @@ -108,7 +103,7 @@ public class OverseerNodePrioritizer { private void invokeOverseerOp(String electionNode, String op) { ModifiableSolrParams params = new ModifiableSolrParams(); - ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(httpClient); + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString()); params.set("op", op); params.set("qt", adminPath); diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index 5a1b8dac6f2..c73e57ba6b8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -35,7 +35,6 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrCore; -import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardResponse; @@ -70,7 +69,7 @@ public class SyncStrategy { public SyncStrategy(CoreContainer cc) { UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler(); client = updateShardHandler.getDefaultHttpClient(); - shardHandler = ((HttpShardHandlerFactory)cc.getShardHandlerFactory()).getShardHandler(cc.getUpdateShardHandler().getDefaultHttpClient()); + shardHandler = cc.getShardHandlerFactory().getShardHandler(); updateExecutor = updateShardHandler.getUpdateExecutor(); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java index 68565f8a69c..6ff3797d3df 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java @@ -170,7 +170,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd { String backupName = request.getStr(NAME); String asyncId = request.getStr(ASYNC); String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY); - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); String commitName = request.getStr(CoreAdminParams.COMMIT_NAME); Optional snapshotMeta = Optional.empty(); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java index 6498c8bd0ef..df55920d398 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java @@ -207,7 +207,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd collectionName, shardNames, message)); } Map coresToCreate = new LinkedHashMap<>(); - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); for (ReplicaPosition replicaPosition : replicaPositions) { String nodeName = replicaPosition.node; diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java index a1109522212..e3d8ab5b15a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java @@ -96,7 +96,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { @SuppressWarnings({"rawtypes"}) NamedList shardRequestResults = new NamedList(); Map shardByCoreName = new HashMap<>(); - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java index c263203dcc8..ff168c469e3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java @@ -232,7 +232,7 @@ public class DeleteReplicaCmd implements Cmd { " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'"); } - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); String asyncId = message.getStr(ASYNC); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java index 2f62139807a..9e4388bb252 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java @@ -77,7 +77,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { String asyncId = message.getStr(ASYNC); @SuppressWarnings({"rawtypes"}) NamedList shardRequestResults = new NamedList(); - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); SolrZkClient zkClient = ocmh.zkStateReader.getZkClient(); Optional meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java index c41cb7fcfa4..85bac4bfffd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java @@ -43,7 +43,6 @@ import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; -import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.update.SolrIndexSplitter; @@ -161,7 +160,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey); ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory; - ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange); // intersect source range, keyHashRange and target range diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index 63366a81aef..5c3b9d48ca9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -337,7 +337,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, sreq.shards = new String[] {baseUrl}; sreq.actualShards = sreq.shards; sreq.params = params; - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); shardHandler.submit(sreq, baseUrl, sreq.params); } @@ -725,7 +725,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId); String collectionName = message.getStr(NAME); @SuppressWarnings("deprecation") - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); ClusterState clusterState = zkStateReader.getClusterState(); DocCollection coll = clusterState.getCollection(collectionName); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java index c7b5aa16895..47797b4f5fd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java @@ -93,7 +93,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { String restoreCollectionName = message.getStr(COLLECTION_PROP); String backupName = message.getStr(NAME); // of backup - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); String asyncId = message.getStr(ASYNC); String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java index 2d04947be1c..4dd2d7021b3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java @@ -208,7 +208,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { List> replicas = new ArrayList<>((repFactor - 1) * 2); @SuppressWarnings("deprecation") - ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, false)) { diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index 6dd60eb482a..f33b783dc72 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -16,30 +16,43 @@ */ package org.apache.solr.handler.component; -import java.io.IOException; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; import org.apache.solr.client.solrj.SolrRequest; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.Http2SolrClient; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.impl.LBHttp2SolrClient; +import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.routing.ReplicaListTransformer; +import org.apache.solr.client.solrj.util.Cancellable; +import org.apache.solr.client.solrj.util.AsyncListener; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; +import org.apache.solr.common.annotation.SolrSingleThreaded; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.request.SolrRequestInfo; +import org.apache.solr.util.tracing.GlobalTracer; +import org.apache.solr.util.tracing.SolrRequestCarrier; +@SolrSingleThreaded public class HttpShardHandler extends ShardHandler { /** * If the request context map has an entry with this key and Boolean.TRUE as value, @@ -49,33 +62,130 @@ public class HttpShardHandler extends ShardHandler { */ public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime"; - final HttpShardHandlerFactory httpShardHandlerFactory; - private CompletionService completionService; - private Set> pending; - private Http2SolrClient httpClient; + private HttpShardHandlerFactory httpShardHandlerFactory; + private Map responseCancellableMap; + private BlockingQueue responses; + private AtomicInteger pending; + private Map> shardToURLs; + private LBHttp2SolrClient lbClient; - public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) { - this.httpClient = httpClient; + public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) { this.httpShardHandlerFactory = httpShardHandlerFactory; - completionService = httpShardHandlerFactory.newCompletionService(); - pending = new HashSet<>(); + this.lbClient = httpShardHandlerFactory.loadbalancer; + this.pending = new AtomicInteger(0); + this.responses = new LinkedBlockingQueue<>(); + this.responseCancellableMap = new HashMap<>(); + + // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574") + // This is primarily to keep track of what order we should use to query the replicas of a shard + // so that we use the same replica for all phases of a distributed request. + shardToURLs = new HashMap<>(); } - @Override - public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { - ShardRequestor shardRequestor = new ShardRequestor(sreq, shard, params, this); - try { - shardRequestor.init(); - pending.add(completionService.submit(shardRequestor)); - } finally { - shardRequestor.end(); + private static class SimpleSolrResponse extends SolrResponse { + + volatile long elapsedTime; + + volatile NamedList nl; + + @Override + public long getElapsedTime() { + return elapsedTime; + } + + @Override + public NamedList getResponse() { + return nl; + } + + @Override + public void setResponse(NamedList rsp) { + nl = rsp; + } + + @Override + public void setElapsedTime(long elapsedTime) { + this.elapsedTime = elapsedTime; } } - protected NamedList request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException { - req.setBasePath(url); - return httpClient.request(req); + // Not thread safe... don't use in Callable. + // Don't modify the returned URL list. + private List getURLs(String shard) { + List urls = shardToURLs.get(shard); + if (urls == null) { + urls = httpShardHandlerFactory.buildURLList(shard); + shardToURLs.put(shard, urls); + } + return urls; + } + + @Override + public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { + // do this outside of the callable for thread safety reasons + final List urls = getURLs(shard); + final Tracer tracer = GlobalTracer.getTracer(); + final Span span = tracer != null ? tracer.activeSpan() : null; + + params.remove(CommonParams.WT); // use default (currently javabin) + params.remove(CommonParams.VERSION); + QueryRequest req = makeQueryRequest(sreq, params, shard); + req.setMethod(SolrRequest.METHOD.POST); + + LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls); + + ShardResponse srsp = new ShardResponse(); + if (sreq.nodeName != null) { + srsp.setNodeName(sreq.nodeName); + } + srsp.setShardRequest(sreq); + srsp.setShard(shard); + SimpleSolrResponse ssr = new SimpleSolrResponse(); + srsp.setSolrResponse(ssr); + + pending.incrementAndGet(); + // if there are no shards available for a slice, urls.size()==0 + if (urls.size() == 0) { + // TODO: what's the right error code here? We should use the same thing when + // all of the servers for a shard are down. + SolrException exception = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); + srsp.setException(exception); + srsp.setResponseCode(exception.code()); + responses.add(srsp); + return; + } + + // all variables that set inside this listener must be at least volatile + responseCancellableMap.put(srsp, this.lbClient.asyncReq(lbReq, new AsyncListener<>() { + volatile long startTime = System.nanoTime(); + + @Override + public void onStart() { + if (tracer != null && span != null) { + tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req)); + } + SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); + if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal()); + } + + @Override + public void onSuccess(LBSolrClient.Rsp rsp) { + ssr.nl = rsp.getResponse(); + srsp.setShardAddress(rsp.getServer()); + ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + responses.add(srsp); + } + + public void onFailure(Throwable throwable) { + ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + srsp.setException(throwable); + if (throwable instanceof SolrException) { + srsp.setResponseCode(((SolrException) throwable).code()); + } + responses.add(srsp); + } + })); } /** @@ -113,12 +223,12 @@ public class HttpShardHandler extends ShardHandler { } private ShardResponse take(boolean bailOnError) { + try { + while (pending.get() > 0) { + ShardResponse rsp = responses.take(); + responseCancellableMap.remove(rsp); - while (pending.size() > 0) { - try { - Future future = completionService.take(); - pending.remove(future); - ShardResponse rsp = future.get(); + pending.decrementAndGet(); if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately // add response to the response list... we do this after the take() and // not after the completion of "call" so we know when the last response @@ -128,13 +238,9 @@ public class HttpShardHandler extends ShardHandler { if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) { return rsp; } - } catch (InterruptedException e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); - } catch (ExecutionException e) { - // should be impossible... the problem with catching the exception - // at this level is we don't know what ShardRequest it applied to - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e); } + } catch (InterruptedException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } return null; } @@ -142,9 +248,11 @@ public class HttpShardHandler extends ShardHandler { @Override public void cancelAll() { - for (Future future : pending) { - future.cancel(false); + for (Cancellable cancellable : responseCancellableMap.values()) { + cancellable.cancel(); + pending.decrementAndGet(); } + responseCancellableMap.clear(); } @Override diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index 80bddadfbe2..22786d97b5f 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -16,7 +16,6 @@ */ package org.apache.solr.handler.component; -import java.io.IOException; import java.lang.invoke.MethodHandles; import java.net.MalformedURLException; import java.net.URL; @@ -28,8 +27,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; @@ -37,13 +34,8 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; -import org.apache.http.client.HttpClient; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.SolrRequest; -import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.impl.HttpClientUtil; -import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.LBHttp2SolrClient; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.routing.AffinityReplicaListTransformerFactory; @@ -96,7 +88,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. protected volatile Http2SolrClient defaultClient; protected InstrumentedHttpListenerFactory httpListenerFactory; - private LBHttp2SolrClient loadbalancer; + protected LBHttp2SolrClient loadbalancer; int corePoolSize = 0; int maximumPoolSize = Integer.MAX_VALUE; @@ -151,27 +143,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. */ @Override public ShardHandler getShardHandler() { - return getShardHandler(defaultClient); - } - - /** - * Get {@link ShardHandler} that uses custom http client. - */ - public ShardHandler getShardHandler(final Http2SolrClient httpClient){ - return new HttpShardHandler(this, httpClient); - } - - @Deprecated - public ShardHandler getShardHandler(final HttpClient httpClient) { - // a little hack for backward-compatibility when we are moving from apache http client to jetty client - return new HttpShardHandler(this, null) { - @Override - protected NamedList request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException { - try (SolrClient client = new HttpSolrClient.Builder(url).withHttpClient(httpClient).build()) { - return client.request(req); - } - } - }; + return new HttpShardHandler(this); } /** @@ -318,6 +290,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. this.defaultClient = new Http2SolrClient.Builder() .connectionTimeout(connectionTimeout) .idleTimeout(soTimeout) + .withExecutor(commExecutor) .maxConnectionsPerHost(maxConnectionsPerHost).build(); this.defaultClient.addListenerFactory(this.httpListenerFactory); this.loadbalancer = new LBHttp2SolrClient(defaultClient); @@ -347,16 +320,16 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. @Override public void close() { try { - ExecutorUtil.shutdownAndAwaitTermination(commExecutor); + if (loadbalancer != null) { + loadbalancer.close(); + } } finally { try { - if (loadbalancer != null) { - loadbalancer.close(); - } - } finally { if (defaultClient != null) { IOUtils.closeQuietly(defaultClient); } + } finally { + ExecutorUtil.shutdownAndAwaitTermination(commExecutor); } } try { @@ -371,18 +344,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. return solrMetricsContext; } - /** - * Makes a request to one or more of the given urls, using the configured load balancer. - * - * @param req The solr search request that should be sent through the load balancer - * @param urls The list of solr server urls to load balance across - * @return The response from the request - */ - public LBSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List urls) - throws SolrServerException, IOException { - return loadbalancer.request(newLBHttpSolrClientReq(req, urls)); - } - protected LBSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List urls) { int numServersToTry = (int)Math.floor(urls.size() * this.permittedLoadBalancerRequestsMaximumFraction); if (numServersToTry < this.permittedLoadBalancerRequestsMinimumAbsolute) { @@ -428,13 +389,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org. } } - /** - * Creates a new completion service for use by a single set of distributed requests. - */ - public CompletionService newCompletionService() { - return new ExecutorCompletionService<>(commExecutor); - } - /** * Rebuilds the URL replacing the URL scheme of the passed URL with the * configured scheme replacement.If no scheme was configured, the passed URL's diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java b/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java deleted file mode 100644 index c87f126b5c0..00000000000 --- a/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.handler.component; - -import io.opentracing.Span; -import io.opentracing.Tracer; -import io.opentracing.propagation.Format; -import java.net.ConnectException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import org.apache.solr.client.solrj.SolrRequest; -import org.apache.solr.client.solrj.SolrResponse; -import org.apache.solr.client.solrj.impl.LBSolrClient; -import org.apache.solr.client.solrj.request.QueryRequest; -import org.apache.solr.common.SolrException; -import org.apache.solr.common.params.CommonParams; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.util.NamedList; -import org.apache.solr.request.SolrRequestInfo; -import org.apache.solr.util.tracing.GlobalTracer; -import org.apache.solr.util.tracing.SolrRequestCarrier; -import org.slf4j.MDC; - -class ShardRequestor implements Callable { - private final ShardRequest sreq; - private final String shard; - private final ModifiableSolrParams params; - private final Tracer tracer; - private final Span span; - private final List urls; - private final HttpShardHandler httpShardHandler; - - // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574") - // This is primarily to keep track of what order we should use to query the replicas of a shard - // so that we use the same replica for all phases of a distributed request. - private Map> shardToURLs = new HashMap<>(); - - public ShardRequestor(ShardRequest sreq, String shard, ModifiableSolrParams params, HttpShardHandler httpShardHandler) { - this.sreq = sreq; - this.shard = shard; - this.params = params; - this.httpShardHandler = httpShardHandler; - // do this before call() for thread safety reasons - this.urls = getURLs(shard); - tracer = GlobalTracer.getTracer(); - span = tracer != null ? tracer.activeSpan() : null; - } - - - // Not thread safe... don't use in Callable. - // Don't modify the returned URL list. - private List getURLs(String shard) { - List urls = shardToURLs.get(shard); - if (urls == null) { - urls = httpShardHandler.httpShardHandlerFactory.buildURLList(shard); - shardToURLs.put(shard, urls); - } - return urls; - } - - void init() { - if (shard != null) { - MDC.put("ShardRequest.shards", shard); - } - if (urls != null && !urls.isEmpty()) { - MDC.put("ShardRequest.urlList", urls.toString()); - } - } - - void end() { - MDC.remove("ShardRequest.shards"); - MDC.remove("ShardRequest.urlList"); - } - - @Override - public ShardResponse call() throws Exception { - - ShardResponse srsp = new ShardResponse(); - if (sreq.nodeName != null) { - srsp.setNodeName(sreq.nodeName); - } - srsp.setShardRequest(sreq); - srsp.setShard(shard); - SimpleSolrResponse ssr = new SimpleSolrResponse(); - srsp.setSolrResponse(ssr); - long startTime = System.nanoTime(); - - try { - params.remove(CommonParams.WT); // use default (currently javabin) - params.remove(CommonParams.VERSION); - - QueryRequest req = httpShardHandler.makeQueryRequest(sreq, params, shard); - if (tracer != null && span != null) { - tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req)); - } - req.setMethod(SolrRequest.METHOD.POST); - SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); - if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal()); - - // no need to set the response parser as binary is the defaultJab - // req.setResponseParser(new BinaryResponseParser()); - - // if there are no shards available for a slice, urls.size()==0 - if (urls.size() == 0) { - // TODO: what's the right error code here? We should use the same thing when - // all of the servers for a shard are down. - throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard); - } - - if (urls.size() <= 1) { - String url = urls.get(0); - srsp.setShardAddress(url); - ssr.nl = httpShardHandler.request(url, req); - } else { - LBSolrClient.Rsp rsp = httpShardHandler.httpShardHandlerFactory.makeLoadBalancedRequest(req, urls); - ssr.nl = rsp.getResponse(); - srsp.setShardAddress(rsp.getServer()); - } - } catch (ConnectException cex) { - srsp.setException(cex); //???? - } catch (Exception th) { - srsp.setException(th); - if (th instanceof SolrException) { - srsp.setResponseCode(((SolrException) th).code()); - } else { - srsp.setResponseCode(-1); - } - } - - ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - - return httpShardHandler.transfomResponse(sreq, srsp, shard); - } - - static class SimpleSolrResponse extends SolrResponse { - - long elapsedTime; - - NamedList nl; - - @Override - public long getElapsedTime() { - return elapsedTime; - } - - @Override - public NamedList getResponse() { - return nl; - } - - @Override - public void setResponse(NamedList rsp) { - nl = rsp; - } - - @Override - public void setElapsedTime(long elapsedTime) { - this.elapsedTime = elapsedTime; - } - } -} diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java b/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java index 5da721c4ce7..9b4a66ec8f0 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java +++ b/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java @@ -22,9 +22,9 @@ public final class ShardResponse { private ShardRequest req; private String shard; private String nodeName; - private String shardAddress; // the specific shard that this response was received from + private volatile String shardAddress; // the specific shard that this response was received from private int rspCode; - private Throwable exception; + private volatile Throwable exception; private SolrResponse rsp; @Override diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index e4b94fc0513..09d2ce0863d 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -44,8 +44,8 @@ import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrInfoBean; -import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.handler.component.ShardHandler; +import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardResponse; import org.apache.solr.metrics.SolrMetricProducer; @@ -78,7 +78,7 @@ public class PeerSync implements SolrMetricProducer { private UpdateHandler uhandler; private UpdateLog ulog; - private HttpShardHandlerFactory shardHandlerFactory; + private ShardHandlerFactory shardHandlerFactory; private ShardHandler shardHandler; private List requests = new ArrayList<>(); @@ -123,8 +123,8 @@ public class PeerSync implements SolrMetricProducer { uhandler = core.getUpdateHandler(); ulog = uhandler.getUpdateLog(); // TODO: close - shardHandlerFactory = (HttpShardHandlerFactory) core.getCoreContainer().getShardHandlerFactory(); - shardHandler = shardHandlerFactory.getShardHandler(client); + shardHandlerFactory = core.getCoreContainer().getShardHandlerFactory(); + shardHandler = shardHandlerFactory.getShardHandler(); this.updater = new Updater(msg(), core); core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.REPLICATION.toString(), this); @@ -418,7 +418,7 @@ public class PeerSync implements SolrMetricProducer { sreq.params.set(DISTRIB, false); sreq.params.set("checkCanHandleVersionRanges", false); - ShardHandler sh = shardHandlerFactory.getShardHandler(client); + ShardHandler sh = shardHandlerFactory.getShardHandler(); sh.submit(sreq, replica, sreq.params); ShardResponse srsp = sh.takeCompletedIncludingErrors(); diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java index a5f9e796d3a..2e621bef754 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java @@ -40,7 +40,6 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; import org.apache.solr.client.solrj.impl.ClusterStateProvider; -import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.cloud.Overseer.LeaderStatus; import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent; import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler; @@ -151,7 +150,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 { Overseer overseer, DistributedMap completedMap, DistributedMap failureMap) { - super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory, null), workQueue, runningMap, completedMap, failureMap); + super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap); } @Override @@ -253,8 +252,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 { protected Set commonMocks(int liveNodesCount) throws Exception { when(shardHandlerFactoryMock.getShardHandler()).thenReturn(shardHandlerMock); - when(shardHandlerFactoryMock.getShardHandler(any(Http2SolrClient.class))).thenReturn(shardHandlerMock); - when(shardHandlerFactoryMock.getShardHandler(any(HttpClient.class))).thenReturn(shardHandlerMock); when(workQueueMock.peekTopN(anyInt(), any(), anyLong())).thenAnswer(invocation -> { Object result; int count = 0; diff --git a/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java b/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java index 80b8e006dcd..d75497ed992 100644 --- a/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java +++ b/solr/core/src/test/org/apache/solr/security/hadoop/TestSolrCloudWithHadoopAuthPlugin.java @@ -136,6 +136,6 @@ public class TestSolrCloudWithHadoopAuthPlugin extends SolrCloudAuthTestCase { deleteReq.process(solrClient); AbstractDistribZkTestBase.waitForCollectionToDisappear(collectionName, solrClient.getZkStateReader(), true, 330); - assertAuthMetricsMinimums(14, 8, 0, 6, 0, 0); - } + // cookie was used to avoid re-authentication + assertAuthMetricsMinimums(13, 8, 0, 5, 0, 0); } } diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java index 0046c12839d..ab6cf839198 100644 --- a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java +++ b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java @@ -28,6 +28,8 @@ import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.util.Cancellable; +import org.apache.solr.client.solrj.util.AsyncListener; import org.apache.solr.common.SolrException; import org.apache.solr.common.util.NamedList; @@ -120,21 +122,21 @@ public class MockingHttp2SolrClient extends Http2SolrClient { return super.request(request, collection); } - public NamedList request(@SuppressWarnings({"rawtypes"})SolrRequest request, - String collection, OnComplete onComplete) - throws SolrServerException, IOException { + @Override + public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest request, + String collection, AsyncListener> asyncListener) { if (request instanceof UpdateRequest) { UpdateRequest ur = (UpdateRequest) request; // won't throw exception if request is DBQ if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) { - return super.request(request, collection, onComplete); + return super.asyncRequest(request, collection, asyncListener); } } if (exp != null) { if (oneExpPerReq) { if (reqGotException.contains(request)) { - return super.request(request, collection, onComplete); + return super.asyncRequest(request, collection, asyncListener); } else reqGotException.add(request); @@ -143,17 +145,12 @@ public class MockingHttp2SolrClient extends Http2SolrClient { Exception e = exception(); if (e instanceof IOException) { if (LuceneTestCase.random().nextBoolean()) { - throw (IOException) e; - } else { - throw new SolrServerException(e); + e = new SolrServerException(e); } - } else if (e instanceof SolrServerException) { - throw (SolrServerException) e; - } else { - throw new SolrServerException(e); } + asyncListener.onFailure(e); } - return super.request(request, collection, onComplete); + return super.asyncRequest(request, collection, asyncListener); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java index a8b7207a22e..218a096e055 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java @@ -39,7 +39,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -55,8 +54,10 @@ import org.apache.solr.client.solrj.embedded.SSLConfig; import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.V2Request; +import org.apache.solr.client.solrj.util.Cancellable; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.client.solrj.util.Constants; +import org.apache.solr.client.solrj.util.AsyncListener; import org.apache.solr.common.SolrException; import org.apache.solr.common.StringUtils; import org.apache.solr.common.params.CommonParams; @@ -75,9 +76,7 @@ import org.eclipse.jetty.client.HttpClientTransport; import org.eclipse.jetty.client.ProtocolHandlers; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; -import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.client.util.FormContentProvider; import org.eclipse.jetty.client.util.InputStreamContentProvider; @@ -97,8 +96,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException; import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException; +import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException; import static org.apache.solr.common.util.Utils.getObjectByPath; /** @@ -135,6 +134,8 @@ public class Http2SolrClient extends SolrClient { */ private String serverBaseUrl; private boolean closeClient; + private ExecutorService executor; + private boolean shutdownExecutor; protected Http2SolrClient(String serverBaseUrl, Builder builder) { if (serverBaseUrl != null) { @@ -178,8 +179,14 @@ public class Http2SolrClient extends SolrClient { HttpClient httpClient; BlockingArrayQueue queue = new BlockingArrayQueue<>(256, 256); - ThreadPoolExecutor httpClientExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32, - 256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc")); + executor = builder.executor; + if (executor == null) { + this.executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32, + 256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc")); + shutdownExecutor = true; + } else { + shutdownExecutor = false; + } SslContextFactory.Client sslContextFactory; boolean ssl; @@ -210,7 +217,7 @@ public class Http2SolrClient extends SolrClient { httpClient.setMaxConnectionsPerDestination(4); } - httpClient.setExecutor(httpClientExecutor); + httpClient.setExecutor(this.executor); httpClient.setStrictEventOrdering(false); httpClient.setConnectBlocking(true); httpClient.setFollowRedirects(false); @@ -232,14 +239,15 @@ public class Http2SolrClient extends SolrClient { asyncTracker.waitForComplete(); if (closeClient) { try { - ExecutorService executor = (ExecutorService) httpClient.getExecutor(); httpClient.setStopTimeout(1000); httpClient.stop(); - ExecutorUtil.shutdownAndAwaitTermination(executor); } catch (Exception e) { throw new RuntimeException("Exception on closing client", e); } } + if (shutdownExecutor) { + ExecutorUtil.shutdownAndAwaitTermination(executor); + } assert ObjectReleaseTracker.release(this); } @@ -361,76 +369,100 @@ public class Http2SolrClient extends SolrClient { outStream.flush(); } - public NamedList request(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest, - String collection, - OnComplete onComplete) throws IOException, SolrServerException { + private static final Exception CANCELLED_EXCEPTION = new Exception(); + private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {}; + + public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection, AsyncListener> asyncListener) { + Request req; + try { + req = makeRequest(solrRequest, collection); + } catch (SolrServerException | IOException e) { + asyncListener.onFailure(e); + return FAILED_MAKING_REQUEST_CANCELLABLE; + } + final ResponseParser parser = solrRequest.getResponseParser() == null + ? this.parser: solrRequest.getResponseParser(); + req.onRequestQueued(asyncTracker.queuedListener) + .onComplete(asyncTracker.completeListener) + .send(new InputStreamResponseListener() { + @Override + public void onHeaders(Response response) { + super.onHeaders(response); + InputStreamResponseListener listener = this; + executor.execute(() -> { + InputStream is = listener.getInputStream(); + assert ObjectReleaseTracker.track(is); + try { + NamedList body = processErrorsAndResponse(solrRequest, parser, response, is); + asyncListener.onSuccess(body); + } catch (RemoteSolrException e) { + if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) { + asyncListener.onFailure(e); + } + } catch (SolrServerException e) { + asyncListener.onFailure(e); + } + }); + } + + @Override + public void onFailure(Response response, Throwable failure) { + super.onFailure(response, failure); + if (failure != CANCELLED_EXCEPTION) { + asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure)); + } + } + }); + return () -> req.abort(CANCELLED_EXCEPTION); + } + + @Override + public NamedList request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException { Request req = makeRequest(solrRequest, collection); final ResponseParser parser = solrRequest.getResponseParser() == null ? this.parser: solrRequest.getResponseParser(); - if (onComplete != null) { - // This async call only suitable for indexing since the response size is limited by 5MB - req.onRequestQueued(asyncTracker.queuedListener) - .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) { + try { + InputStreamResponseListener listener = new InputStreamResponseListener(); + req.send(listener); + Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS); + InputStream is = listener.getInputStream(); + assert ObjectReleaseTracker.track(is); - @Override - public void onComplete(Result result) { - if (result.isFailed()) { - onComplete.onFailure(result.getFailure()); - return; - } - - NamedList rsp; - try { - InputStream is = getContentAsInputStream(); - assert ObjectReleaseTracker.track(is); - rsp = processErrorsAndResponse(result.getResponse(), - parser, is, getMediaType(), getEncoding(), isV2ApiRequest(solrRequest)); - onComplete.onSuccess(rsp); - } catch (Exception e) { - onComplete.onFailure(e); - } - } - }); - return null; - } else { - try { - InputStreamResponseListener listener = new InputStreamResponseListener(); - req.send(listener); - Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS); - InputStream is = listener.getInputStream(); - assert ObjectReleaseTracker.track(is); - - ContentType contentType = getContentType(response); - String mimeType = null; - String encoding = null; - if (contentType != null) { - mimeType = contentType.getMimeType(); - encoding = contentType.getCharset() != null? contentType.getCharset().name() : null; - } - return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (TimeoutException e) { - throw new SolrServerException( - "Timeout occured while waiting response from server at: " + req.getURI(), e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof ConnectException) { - throw new SolrServerException("Server refused connection at: " + req.getURI(), cause); - } - if (cause instanceof SolrServerException) { - throw (SolrServerException) cause; - } else if (cause instanceof IOException) { - throw new SolrServerException( - "IOException occured when talking to server at: " + getBaseURL(), cause); - } - throw new SolrServerException(cause.getMessage(), cause); + return processErrorsAndResponse(solrRequest, parser, response, is); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new SolrServerException( + "Timeout occured while waiting response from server at: " + req.getURI(), e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof ConnectException) { + throw new SolrServerException("Server refused connection at: " + req.getURI(), cause); } + if (cause instanceof SolrServerException) { + throw (SolrServerException) cause; + } else if (cause instanceof IOException) { + throw new SolrServerException( + "IOException occured when talking to server at: " + getBaseURL(), cause); + } + throw new SolrServerException(cause.getMessage(), cause); } } + private NamedList processErrorsAndResponse(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest, + ResponseParser parser, Response response, InputStream is) throws SolrServerException { + ContentType contentType = getContentType(response); + String mimeType = null; + String encoding = null; + if (contentType != null) { + mimeType = contentType.getMimeType(); + encoding = contentType.getCharset() != null? contentType.getCharset().name() : null; + } + return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest)); + } + private ContentType getContentType(Response response) { String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE); return StringUtils.isEmpty(contentType)? null : ContentType.parse(contentType); @@ -453,6 +485,7 @@ public class Http2SolrClient extends SolrClient { private void decorateRequest(Request req, @SuppressWarnings({"rawtypes"})SolrRequest solrRequest) { req.header(HttpHeader.ACCEPT_ENCODING, null); + req.timeout(idleTimeout, TimeUnit.MILLISECONDS); if (solrRequest.getUserPrincipal() != null) { req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal()); } @@ -750,21 +783,10 @@ public class Http2SolrClient extends SolrClient { } } - @Override - public NamedList request(@SuppressWarnings({"rawtypes"})SolrRequest request, String collection) throws SolrServerException, IOException { - return request(request, collection, null); - } - public void setRequestWriter(RequestWriter requestWriter) { this.requestWriter = requestWriter; } - public interface OnComplete { - void onSuccess(NamedList result); - - void onFailure(Throwable e); - } - public void setFollowRedirects(boolean follow) { httpClient.setFollowRedirects(follow); } @@ -821,6 +843,7 @@ public class Http2SolrClient extends SolrClient { private Integer maxConnectionsPerHost; private boolean useHttp1_1 = Boolean.getBoolean("solr.http1"); protected String baseSolrUrl; + private ExecutorService executor; public Builder() { @@ -842,6 +865,11 @@ public class Http2SolrClient extends SolrClient { return this; } + public Builder withExecutor(ExecutorService executor) { + this.executor = executor; + return this; + } + public Builder withSSLConfig(SSLConfig sslConfig) { this.sslConfig = sslConfig; return this; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java index 293a2647f61..96f96bf5cb8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java @@ -16,9 +16,24 @@ */ package org.apache.solr.client.solrj.impl; +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.IsUpdateRequest; +import org.apache.solr.client.solrj.util.Cancellable; +import org.apache.solr.client.solrj.util.AsyncListener; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.NamedList; +import org.slf4j.MDC; + +import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS; /** * LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around @@ -66,4 +81,126 @@ public class LBHttp2SolrClient extends LBSolrClient { protected SolrClient getClient(String baseUrl) { return httpClient; } + + public Cancellable asyncReq(Req req, AsyncListener asyncListener) { + Rsp rsp = new Rsp(); + boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath()); + ServerIterator it = new ServerIterator(req, zombieServers); + asyncListener.onStart(); + final AtomicBoolean cancelled = new AtomicBoolean(false); + AtomicReference currentCancellable = new AtomicReference<>(); + RetryListener retryListener = new RetryListener() { + + @Override + public void onSuccess(Rsp rsp) { + asyncListener.onSuccess(rsp); + } + + @Override + public void onFailure(Exception e, boolean retryReq) { + if (retryReq) { + String url; + try { + url = it.nextOrError(e); + } catch (SolrServerException ex) { + asyncListener.onFailure(e); + return; + } + try { + MDC.put("LBSolrClient.url", url); + synchronized (cancelled) { + if (cancelled.get()) { + return; + } + Cancellable cancellable = doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this); + currentCancellable.set(cancellable); + } + } finally { + MDC.remove("LBSolrClient.url"); + } + } else { + asyncListener.onFailure(e); + } + } + }; + try { + Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener); + currentCancellable.set(cancellable); + } catch (SolrServerException e) { + asyncListener.onFailure(e); + } + return () -> { + synchronized (cancelled) { + cancelled.set(true); + if (currentCancellable.get() != null) { + currentCancellable.get().cancel(); + } + } + }; + } + + private interface RetryListener { + void onSuccess(Rsp rsp); + void onFailure(Exception e, boolean retryReq); + } + + private Cancellable doRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable, + boolean isZombie, RetryListener listener) { + rsp.server = baseUrl; + req.getRequest().setBasePath(baseUrl); + return ((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new AsyncListener<>() { + @Override + public void onSuccess(NamedList result) { + rsp.rsp = result; + if (isZombie) { + zombieServers.remove(baseUrl); + } + listener.onSuccess(rsp); + } + + @Override + public void onFailure(Throwable oe) { + try { + throw (Exception) oe; + } catch (BaseHttpSolrClient.RemoteExecutionException e) { + listener.onFailure(e, false); + } catch (SolrException e) { + // we retry on 404 or 403 or 503 or 500 + // unless it's an update - then we only retry on connect exception + if (!isNonRetryable && RETRY_CODES.contains(e.code())) { + listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true); + } else { + // Server is alive but the request was likely malformed or invalid + if (isZombie) { + zombieServers.remove(baseUrl); + } + listener.onFailure(e, false); + } + } catch (SocketException e) { + if (!isNonRetryable || e instanceof ConnectException) { + listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true); + } else { + listener.onFailure(e, false); + } + } catch (SocketTimeoutException e) { + if (!isNonRetryable) { + listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true); + } else { + listener.onFailure(e, false); + } + } catch (SolrServerException e) { + Throwable rootCause = e.getRootCause(); + if (!isNonRetryable && rootCause instanceof IOException) { + listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true); + } else if (isNonRetryable && rootCause instanceof ConnectException) { + listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true); + } else { + listener.onFailure(e, false); + } + } catch (Exception e) { + listener.onFailure(new SolrServerException(e), false); + } + } + }); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java index 1654e32a595..c1e6af79623 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -60,7 +61,7 @@ import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS; public abstract class LBSolrClient extends SolrClient { // defaults - private static final Set RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500)); + protected static final Set RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500)); private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list @@ -69,7 +70,7 @@ public abstract class LBSolrClient extends SolrClient { private final Map aliveServers = new LinkedHashMap<>(); // access to aliveServers should be synchronized on itself - private final Map zombieServers = new ConcurrentHashMap<>(); + protected final Map zombieServers = new ConcurrentHashMap<>(); // changes to aliveServers are reflected in this array, no need to synchronize private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0]; @@ -136,6 +137,99 @@ public abstract class LBSolrClient extends SolrClient { } } + protected static class ServerIterator { + String serverStr; + List skipped; + int numServersTried; + Iterator it; + Iterator skippedIt; + String exceptionMessage; + long timeAllowedNano; + long timeOutTime; + + final Map zombieServers; + final Req req; + + public ServerIterator(Req req, Map zombieServers) { + this.it = req.getServers().iterator(); + this.req = req; + this.zombieServers = zombieServers; + this.timeAllowedNano = getTimeAllowedInNanos(req.getRequest()); + this.timeOutTime = System.nanoTime() + timeAllowedNano; + fetchNext(); + } + + public synchronized boolean hasNext() { + return serverStr != null; + } + + private void fetchNext() { + serverStr = null; + if (req.numServersToTry != null && numServersTried > req.numServersToTry) { + exceptionMessage = "Time allowed to handle this request exceeded"; + return; + } + + while (it.hasNext()) { + serverStr = it.next(); + serverStr = normalize(serverStr); + // if the server is currently a zombie, just skip to the next one + ServerWrapper wrapper = zombieServers.get(serverStr); + if (wrapper != null) { + final int numDeadServersToTry = req.getNumDeadServersToTry(); + if (numDeadServersToTry > 0) { + if (skipped == null) { + skipped = new ArrayList<>(numDeadServersToTry); + skipped.add(wrapper.getBaseUrl()); + } else if (skipped.size() < numDeadServersToTry) { + skipped.add(wrapper.getBaseUrl()); + } + } + continue; + } + + break; + } + if (serverStr == null && skipped != null) { + if (skippedIt == null) { + skippedIt = skipped.iterator(); + } + if (skippedIt.hasNext()) { + serverStr = skippedIt.next(); + } + } + } + + boolean isServingZombieServer() { + return skippedIt != null; + } + + public synchronized String nextOrError() throws SolrServerException { + return nextOrError(null); + } + + public synchronized String nextOrError(Exception previousEx) throws SolrServerException { + String suffix = ""; + if (previousEx == null) { + suffix = ":" + zombieServers.keySet(); + } + if (isTimeExceeded(timeAllowedNano, timeOutTime)) { + throw new SolrServerException("Time allowed to handle this request exceeded"+suffix, previousEx); + } + if (serverStr == null) { + throw new SolrServerException("No live SolrServers available to handle this request"+suffix, previousEx); + } + numServersTried++; + if (req.getNumServersToTry() != null && numServersTried > req.getNumServersToTry()) { + throw new SolrServerException("No live SolrServers available to handle this request:" + + " numServersTried="+numServersTried + + " numServersToTry="+req.getNumServersToTry()+suffix, previousEx); + } + String rs = serverStr; + fetchNext(); + return rs; + } + } public static class Req { @SuppressWarnings({"rawtypes"}) @@ -257,45 +351,12 @@ public abstract class LBSolrClient extends SolrClient { Rsp rsp = new Rsp(); Exception ex = null; boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath()); - List skipped = null; - - final Integer numServersToTry = req.getNumServersToTry(); - int numServersTried = 0; - - boolean timeAllowedExceeded = false; - long timeAllowedNano = getTimeAllowedInNanos(req.getRequest()); - long timeOutTime = System.nanoTime() + timeAllowedNano; - for (String serverStr : req.getServers()) { - if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) { - break; - } - - serverStr = normalize(serverStr); - // if the server is currently a zombie, just skip to the next one - ServerWrapper wrapper = zombieServers.get(serverStr); - if (wrapper != null) { - // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr); - final int numDeadServersToTry = req.getNumDeadServersToTry(); - if (numDeadServersToTry > 0) { - if (skipped == null) { - skipped = new ArrayList<>(numDeadServersToTry); - skipped.add(wrapper); - } - else if (skipped.size() < numDeadServersToTry) { - skipped.add(wrapper); - } - } - continue; - } + ServerIterator serverIterator = new ServerIterator(req, zombieServers); + String serverStr; + while ((serverStr = serverIterator.nextOrError(ex)) != null) { try { MDC.put("LBSolrClient.url", serverStr); - - if (numServersToTry != null && numServersTried > numServersToTry.intValue()) { - break; - } - - ++numServersTried; - ex = doRequest(serverStr, req, rsp, isNonRetryable, false); + ex = doRequest(serverStr, req, rsp, isNonRetryable, serverIterator.isServingZombieServer()); if (ex == null) { return rsp; // SUCCESS } @@ -303,61 +364,19 @@ public abstract class LBSolrClient extends SolrClient { MDC.remove("LBSolrClient.url"); } } - - // try the servers we previously skipped - if (skipped != null) { - for (ServerWrapper wrapper : skipped) { - if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) { - break; - } - - if (numServersToTry != null && numServersTried > numServersToTry.intValue()) { - break; - } - - try { - MDC.put("LBSolrClient.url", wrapper.getBaseUrl()); - ++numServersTried; - ex = doRequest(wrapper.baseUrl, req, rsp, isNonRetryable, true); - if (ex == null) { - return rsp; // SUCCESS - } - } finally { - MDC.remove("LBSolrClient.url"); - } - } - } - - - final String solrServerExceptionMessage; - if (timeAllowedExceeded) { - solrServerExceptionMessage = "Time allowed to handle this request exceeded"; - } else { - if (numServersToTry != null && numServersTried > numServersToTry.intValue()) { - solrServerExceptionMessage = "No live SolrServers available to handle this request:" - + " numServersTried="+numServersTried - + " numServersToTry="+numServersToTry.intValue(); - } else { - solrServerExceptionMessage = "No live SolrServers available to handle this request"; - } - } - if (ex == null) { - throw new SolrServerException(solrServerExceptionMessage); - } else { - throw new SolrServerException(solrServerExceptionMessage+":" + zombieServers.keySet(), ex); - } + throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex); } /** * @return time allowed in nanos, returns -1 if no time_allowed is specified. */ - private long getTimeAllowedInNanos(@SuppressWarnings({"rawtypes"})final SolrRequest req) { + private static long getTimeAllowedInNanos(@SuppressWarnings({"rawtypes"})final SolrRequest req) { SolrParams reqParams = req.getParams(); return reqParams == null ? -1 : TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS); } - private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) { + private static boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) { return timeAllowedNano > 0 && System.nanoTime() > timeOutTime; } @@ -415,7 +434,7 @@ public abstract class LBSolrClient extends SolrClient { protected abstract SolrClient getClient(String baseUrl); - private Exception addZombie(String serverStr, Exception e) { + protected Exception addZombie(String serverStr, Exception e) { ServerWrapper wrapper = createServerWrapper(serverStr); wrapper.standard = false; zombieServers.put(serverStr, wrapper); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java new file mode 100644 index 00000000000..be642757bc8 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.util; + +/** + * Listener for async requests + */ +public interface AsyncListener { + /** + * Callback method invoked before processing the request + */ + default void onStart() { + + } + void onSuccess(T t); + void onFailure(Throwable throwable); + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java new file mode 100644 index 00000000000..323916a4fdc --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.util; + +public interface Cancellable { + void cancel(); +} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java new file mode 100644 index 00000000000..1c355074cd3 --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class LBSolrClientTest { + + @Test + public void testServerIterator() throws SolrServerException { + LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4")); + LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>()); + List actualServers = new ArrayList<>(); + while (serverIterator.hasNext()) { + actualServers.add(serverIterator.nextOrError()); + } + assertEquals(Arrays.asList("1", "2", "3", "4"), actualServers); + assertFalse(serverIterator.hasNext()); + LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError); + } + + @Test + public void testServerIteratorWithZombieServers() throws SolrServerException { + HashMap zombieServers = new HashMap<>(); + LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4")); + LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, zombieServers); + zombieServers.put("2", new LBSolrClient.ServerWrapper("2")); + + assertTrue(serverIterator.hasNext()); + assertEquals("1", serverIterator.nextOrError()); + assertTrue(serverIterator.hasNext()); + assertEquals("3", serverIterator.nextOrError()); + assertTrue(serverIterator.hasNext()); + assertEquals("4", serverIterator.nextOrError()); + assertTrue(serverIterator.hasNext()); + assertEquals("2", serverIterator.nextOrError()); + } + + @Test + public void testServerIteratorTimeAllowed() throws SolrServerException, InterruptedException { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CommonParams.TIME_ALLOWED, 300); + LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(params), Arrays.asList("1", "2", "3", "4"), 2); + LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>()); + assertTrue(serverIterator.hasNext()); + serverIterator.nextOrError(); + Thread.sleep(300); + LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError); + } + + @Test + public void testServerIteratorMaxRetry() throws SolrServerException { + LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"), 2); + LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>()); + assertTrue(serverIterator.hasNext()); + serverIterator.nextOrError(); + assertTrue(serverIterator.hasNext()); + serverIterator.nextOrError(); + LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError); + } +} diff --git a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java index 8e77b0c0c37..25437ff86c6 100644 --- a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java +++ b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java @@ -16,7 +16,6 @@ */ package org.apache.solr.handler.component; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -25,13 +24,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import org.apache.http.client.HttpClient; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.SolrRequest; -import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.embedded.JettySolrRunner; -import org.apache.solr.client.solrj.impl.Http2SolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; @@ -39,7 +32,6 @@ import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.CoreContainer; @@ -90,14 +82,9 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory { @Override public ShardHandler getShardHandler() { - return super.getShardHandler(); - } - - @Override - public ShardHandler getShardHandler(Http2SolrClient client) { final ShardHandlerFactory factory = this; - final ShardHandler wrapped = super.getShardHandler(client); - return new HttpShardHandler(this, client) { + final ShardHandler wrapped = super.getShardHandler(); + return new HttpShardHandler(this) { @Override public void prepDistributed(ResponseBuilder rb) { wrapped.prepDistributed(rb); @@ -135,55 +122,6 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory { }; } - @Override - public ShardHandler getShardHandler(HttpClient httpClient) { - final ShardHandlerFactory factory = this; - final ShardHandler wrapped = super.getShardHandler(httpClient); - return new HttpShardHandler(this, null) { - @Override - public void prepDistributed(ResponseBuilder rb) { - wrapped.prepDistributed(rb); - } - - @Override - public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) { - synchronized (TrackingShardHandlerFactory.this) { - if (isTracking()) { - queue.offer(new ShardRequestAndParams(sreq, shard, params)); - } - } - wrapped.submit(sreq, shard, params); - } - - @Override - protected NamedList request(String url, @SuppressWarnings({"rawtypes"})SolrRequest req) throws IOException, SolrServerException { - try (SolrClient client = new HttpSolrClient.Builder(url).withHttpClient(httpClient).build()) { - return client.request(req); - } - } - - @Override - public ShardResponse takeCompletedIncludingErrors() { - return wrapped.takeCompletedIncludingErrors(); - } - - @Override - public ShardResponse takeCompletedOrError() { - return wrapped.takeCompletedOrError(); - } - - @Override - public void cancelAll() { - wrapped.cancelAll(); - } - - @Override - public ShardHandlerFactory getShardHandlerFactory() { - return factory; - } - }; - } - @Override public void close() { super.close();