SOLR-14531: Refactor out internode requests from HttpShardHandler

This commit is contained in:
noble 2020-06-02 16:00:58 +10:00
parent 552f1940af
commit 502f62cc9c
2 changed files with 167 additions and 135 deletions

View File

@ -17,26 +17,15 @@
package org.apache.solr.handler.component; package org.apache.solr.handler.component;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer; import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.CloudDescriptor;
@ -44,17 +33,12 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.util.tracing.GlobalTracer;
import org.apache.solr.util.tracing.SolrRequestCarrier;
import org.slf4j.MDC;
public class HttpShardHandler extends ShardHandler { public class HttpShardHandler extends ShardHandler {
/** /**
@ -65,10 +49,9 @@ public class HttpShardHandler extends ShardHandler {
*/ */
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime"; public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
private HttpShardHandlerFactory httpShardHandlerFactory; final HttpShardHandlerFactory httpShardHandlerFactory;
private CompletionService<ShardResponse> completionService; private CompletionService<ShardResponse> completionService;
private Set<Future<ShardResponse>> pending; private Set<Future<ShardResponse>> pending;
private Map<String, List<String>> shardToURLs;
private Http2SolrClient httpClient; private Http2SolrClient httpClient;
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) { public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) {
@ -76,130 +59,17 @@ public class HttpShardHandler extends ShardHandler {
this.httpShardHandlerFactory = httpShardHandlerFactory; this.httpShardHandlerFactory = httpShardHandlerFactory;
completionService = httpShardHandlerFactory.newCompletionService(); completionService = httpShardHandlerFactory.newCompletionService();
pending = new HashSet<>(); pending = new HashSet<>();
// maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
// This is primarily to keep track of what order we should use to query the replicas of a shard
// so that we use the same replica for all phases of a distributed request.
shardToURLs = new HashMap<>();
} }
private static class SimpleSolrResponse extends SolrResponse {
long elapsedTime;
NamedList<Object> nl;
@Override
public long getElapsedTime() {
return elapsedTime;
}
@Override
public NamedList<Object> getResponse() {
return nl;
}
@Override
public void setResponse(NamedList<Object> rsp) {
nl = rsp;
}
@Override
public void setElapsedTime(long elapsedTime) {
this.elapsedTime = elapsedTime;
}
}
// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls == null) {
urls = httpShardHandlerFactory.buildURLList(shard);
shardToURLs.put(shard, urls);
}
return urls;
}
@Override @Override
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons ShardRequestor shardRequestor = new ShardRequestor(sreq, shard, params, this);
final List<String> urls = getURLs(shard);
final Tracer tracer = GlobalTracer.getTracer();
final Span span = tracer != null ? tracer.activeSpan() : null;
Callable<ShardResponse> task = () -> {
ShardResponse srsp = new ShardResponse();
if (sreq.nodeName != null) {
srsp.setNodeName(sreq.nodeName);
}
srsp.setShardRequest(sreq);
srsp.setShard(shard);
SimpleSolrResponse ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
long startTime = System.nanoTime();
try {
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
QueryRequest req = makeQueryRequest(sreq, params, shard);
if (tracer != null && span != null) {
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
}
req.setMethod(SolrRequest.METHOD.POST);
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
// no need to set the response parser as binary is the defaultJab
// req.setResponseParser(new BinaryResponseParser());
// if there are no shards available for a slice, urls.size()==0
if (urls.size() == 0) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
}
if (urls.size() <= 1) {
String url = urls.get(0);
srsp.setShardAddress(url);
ssr.nl = request(url, req);
} else {
LBSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
}
} catch (ConnectException cex) {
srsp.setException(cex); //????
} catch (Exception th) {
srsp.setException(th);
if (th instanceof SolrException) {
srsp.setResponseCode(((SolrException) th).code());
} else {
srsp.setResponseCode(-1);
}
}
ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
return transfomResponse(sreq, srsp, shard);
};
try { try {
if (shard != null) { shardRequestor.init();
MDC.put("ShardRequest.shards", shard); pending.add(completionService.submit(shardRequestor));
}
if (urls != null && !urls.isEmpty()) {
MDC.put("ShardRequest.urlList", urls.toString());
}
pending.add(completionService.submit(task));
} finally { } finally {
MDC.remove("ShardRequest.shards"); shardRequestor.end();
MDC.remove("ShardRequest.urlList");
} }
} }

View File

@ -0,0 +1,162 @@
package org.apache.solr.handler.component;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.util.tracing.GlobalTracer;
import org.apache.solr.util.tracing.SolrRequestCarrier;
import org.slf4j.MDC;
class ShardRequestor implements Callable<ShardResponse> {
private final ShardRequest sreq;
private final String shard;
private final ModifiableSolrParams params;
private final Tracer tracer;
private final Span span;
private final List<String> urls;
private final HttpShardHandler httpShardHandler;
// maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
// This is primarily to keep track of what order we should use to query the replicas of a shard
// so that we use the same replica for all phases of a distributed request.
private Map<String, List<String>> shardToURLs = new HashMap<>();
public ShardRequestor(ShardRequest sreq, String shard, ModifiableSolrParams params, HttpShardHandler httpShardHandler) {
this.sreq = sreq;
this.shard = shard;
this.params = params;
this.httpShardHandler = httpShardHandler;
// do this before call() for thread safety reasons
this.urls = getURLs(shard);
tracer = GlobalTracer.getTracer();
span = tracer != null ? tracer.activeSpan() : null;
}
// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls == null) {
urls = httpShardHandler.httpShardHandlerFactory.buildURLList(shard);
shardToURLs.put(shard, urls);
}
return urls;
}
void init() {
if (shard != null) {
MDC.put("ShardRequest.shards", shard);
}
if (urls != null && !urls.isEmpty()) {
MDC.put("ShardRequest.urlList", urls.toString());
}
}
void end() {
MDC.remove("ShardRequest.shards");
MDC.remove("ShardRequest.urlList");
}
@Override
public ShardResponse call() throws Exception {
ShardResponse srsp = new ShardResponse();
if (sreq.nodeName != null) {
srsp.setNodeName(sreq.nodeName);
}
srsp.setShardRequest(sreq);
srsp.setShard(shard);
SimpleSolrResponse ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
long startTime = System.nanoTime();
try {
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
QueryRequest req = httpShardHandler.makeQueryRequest(sreq, params, shard);
if (tracer != null && span != null) {
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
}
req.setMethod(SolrRequest.METHOD.POST);
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
// no need to set the response parser as binary is the defaultJab
// req.setResponseParser(new BinaryResponseParser());
// if there are no shards available for a slice, urls.size()==0
if (urls.size() == 0) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
}
if (urls.size() <= 1) {
String url = urls.get(0);
srsp.setShardAddress(url);
ssr.nl = httpShardHandler.request(url, req);
} else {
LBSolrClient.Rsp rsp = httpShardHandler.httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
}
} catch (ConnectException cex) {
srsp.setException(cex); //????
} catch (Exception th) {
srsp.setException(th);
if (th instanceof SolrException) {
srsp.setResponseCode(((SolrException) th).code());
} else {
srsp.setResponseCode(-1);
}
}
ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
return httpShardHandler.transfomResponse(sreq, srsp, shard);
}
static class SimpleSolrResponse extends SolrResponse {
long elapsedTime;
NamedList<Object> nl;
@Override
public long getElapsedTime() {
return elapsedTime;
}
@Override
public NamedList<Object> getResponse() {
return nl;
}
@Override
public void setResponse(NamedList<Object> rsp) {
nl = rsp;
}
@Override
public void setElapsedTime(long elapsedTime) {
this.elapsedTime = elapsedTime;
}
}
}