SOLR-3134: Adding test and fix from Russell Black

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1294995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan McKinley 2012-02-29 07:32:02 +00:00
parent dcaf6be3e4
commit effa52b18f
6 changed files with 176 additions and 66 deletions

View File

@ -227,7 +227,8 @@ New Features
* SOLR-2459: Expose LogLevel selection with a RequestHandler rather then servlet
(Stefan Matheis, Upayavira, ryan)
* SOLR-3134: Include shard info in distributed response when shards.info=true (ryan)
* SOLR-3134: Include shard info in distributed response when shards.info=true
(Russell Black, ryan)
Optimizations

View File

@ -180,26 +180,11 @@ public class HttpShardHandler extends ShardHandler {
pending.add( completionService.submit(task) );
}
/** returns a ShardResponse of the last response correlated with a ShardRequest */
ShardResponse take() {
while (pending.size() > 0) {
try {
Future<ShardResponse> future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (ExecutionException e) {
// should be impossible... the problem with catching the exception
// at this level is we don't know what ShardRequest it applied to
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
}
}
return null;
/** returns a ShardResponse of the last response correlated with a ShardRequest. This won't
* return early if it runs into an error.
**/
public ShardResponse takeCompletedIncludingErrors() {
return take(false);
}
@ -207,12 +192,17 @@ public class HttpShardHandler extends ShardHandler {
* or immediately returns a ShardResponse if there was an error detected
*/
public ShardResponse takeCompletedOrError() {
return take(true);
}
private ShardResponse take(boolean bailOnError) {
while (pending.size() > 0) {
try {
Future<ShardResponse> future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
if (rsp.getException() != null) return rsp; // if exception, return immediately
if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
// for a request was received. Otherwise we might return the same

View File

@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.ReaderUtil;
import org.apache.lucene.util.UnicodeUtil;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
@ -75,6 +76,8 @@ import org.apache.solr.search.grouping.endresulttransformer.SimpleEndResultTrans
import org.apache.solr.util.SolrPluginUtils;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URL;
import java.util.*;
@ -771,25 +774,47 @@ public class QueryComponent extends SearchComponent
NamedList<Object> shardInfo = null;
if(rb.req.getParams().getBool(ShardParams.SHARDS_INFO, false)) {
shardInfo = (NamedList<Object>) rb.rsp.getValues().get(ShardParams.SHARDS_INFO);
if(shardInfo==null) {
shardInfo = new SimpleOrderedMap<Object>();
rb.rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);
}
shardInfo = new SimpleOrderedMap<Object>();
rb.rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);
}
long numFound = 0;
Float maxScore=null;
for (ShardResponse srsp : sreq.responses) {
SolrDocumentList docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
SolrDocumentList docs = null;
if(shardInfo!=null) {
SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();
nl.add("numFound", docs.getNumFound());
nl.add("maxScore", docs.getMaxScore());
nl.add("time", srsp.getSolrResponse().getElapsedTime());
if (srsp.getException() != null) {
Throwable t = srsp.getException();
if(t instanceof SolrServerException) {
t = ((SolrServerException)t).getCause();
}
nl.add("error", t.toString() );
StringWriter trace = new StringWriter();
t.printStackTrace(new PrintWriter(trace));
nl.add("trace", trace.toString() );
}
else {
docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
nl.add("numFound", docs.getNumFound());
nl.add("maxScore", docs.getMaxScore());
}
if(srsp.getSolrResponse()!=null) {
nl.add("time", srsp.getSolrResponse().getElapsedTime());
}
shardInfo.add(srsp.getShard(), nl);
}
// now that we've added the shard info, let's only proceed if we have no error.
if (srsp.getException() != null) {
continue;
}
if (docs == null) { // could have been initialized in the shards info block above
docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
}
// calculate global maxScore and numDocsFound
if (docs.getMaxScore() != null) {

View File

@ -18,14 +18,11 @@
package org.apache.solr.handler.component;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.RTimer;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
@ -38,8 +35,6 @@ import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.*;
/**
@ -285,38 +280,17 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
// now wait for replies, but if anyone puts more requests on
// the outgoing queue, send them out immediately (by exiting
// this loop)
boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);
while (rb.outgoing.size() == 0) {
ShardResponse srsp = shardHandler1.takeCompletedOrError();
ShardResponse srsp = tolerant ?
shardHandler1.takeCompletedIncludingErrors():
shardHandler1.takeCompletedOrError();
if (srsp == null) break; // no more requests to wait for
// Was there an exception?
if (srsp.getException() != null) {
// If things are tolerant, just continue
if(rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false)) {
if( rb.req.getParams().getBool(ShardParams.SHARDS_INFO, false) ) {
NamedList<Object> sinfo = (NamedList<Object>) rb.rsp.getValues().get(ShardParams.SHARDS_INFO);
if(sinfo==null) {
sinfo = new SimpleOrderedMap<Object>();
rb.rsp.getValues().add(ShardParams.SHARDS_INFO,sinfo);
}
SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();
Throwable t = srsp.getException();
if(t instanceof SolrServerException) {
t = ((SolrServerException)t).getCause();
}
nl.add("error", t.toString() );
StringWriter trace = new StringWriter();
t.printStackTrace(new PrintWriter(trace));
nl.add("trace", trace.toString() );
if(srsp.getSolrResponse()!=null){
nl.add("time", srsp.getSolrResponse().getElapsedTime());
}
sinfo.add(srsp.getShard(), nl);
}
}
else { // If so, abort everything and rethrow
// If things are not tolerant, abort everything and rethrow
if(!tolerant) {
shardHandler1.cancelAll();
if (srsp.getException() instanceof SolrException) {
throw (SolrException)srsp.getException();
@ -336,8 +310,8 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
}
for(SearchComponent c : components) {
c.finishStage(rb);
}
c.finishStage(rb);
}
// we are done when the next stage is MAX_VALUE
} while (nextStage != Integer.MAX_VALUE);

View File

@ -22,6 +22,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
public abstract class ShardHandler {
public abstract void checkDistributed(ResponseBuilder rb);
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) ;
public abstract ShardResponse takeCompletedIncludingErrors();
public abstract ShardResponse takeCompletedOrError();
public abstract void cancelAll();
}

View File

@ -17,8 +17,17 @@
package org.apache.solr;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@ -292,7 +301,32 @@ public class TestDistributedSearch extends BaseDistributedSearchTestCase {
assertNotNull("missing shard info", sinfo);
assertEquals("should have an entry for each shard ["+sinfo+"] "+shards, cnt, sinfo.size());
// test shards.tolerant=true
for(int numDownServers = 0; numDownServers < jettys.size()-1; numDownServers++)
{
List<JettySolrRunner> upJettys = new ArrayList<JettySolrRunner>(jettys);
List<SolrServer> upClients = new ArrayList<SolrServer>(clients);
List<JettySolrRunner> downJettys = new ArrayList<JettySolrRunner>();
List<String> upShards = new ArrayList<String>(Arrays.asList(shardsArr));
for(int i=0; i<numDownServers; i++)
{
// shut down some of the jettys
int indexToRemove = r.nextInt(upJettys.size());
JettySolrRunner downJetty = upJettys.remove(indexToRemove);
upClients.remove(indexToRemove);
upShards.remove(indexToRemove);
ChaosMonkey.stop(downJetty);
downJettys.add(downJetty);
}
queryPartialResults(upShards, upClients, "q","*:*",ShardParams.SHARDS_INFO,"true",ShardParams.SHARDS_TOLERANT,"true");
// restart the jettys
for (JettySolrRunner downJetty : downJettys) {
downJetty.start();
}
}
// This index has the same number for every field
// TODO: This test currently fails because debug info is obtained only
@ -301,5 +335,90 @@ public class TestDistributedSearch extends BaseDistributedSearchTestCase {
// Thread.sleep(10000000000L);
}
protected void queryPartialResults(final List<String> upShards, List<SolrServer> upClients, Object... q) throws Exception {
final ModifiableSolrParams params = new ModifiableSolrParams();
for (int i = 0; i < q.length; i += 2) {
params.add(q[i].toString(), q[i + 1].toString());
}
// TODO: look into why passing true causes fails
params.set("distrib", "false");
final QueryResponse controlRsp = controlClient.query(params);
validateControlData(controlRsp);
params.remove("distrib");
setDistributedParams(params);
QueryResponse rsp = queryRandomUpServer(params,upClients);
comparePartialResponses(rsp, controlRsp, upShards);
if (stress > 0) {
log.info("starting stress...");
Thread[] threads = new Thread[nThreads];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
for (int j = 0; j < stress; j++) {
int which = r.nextInt(clients.size());
SolrServer client = clients.get(which);
try {
QueryResponse rsp = client.query(new ModifiableSolrParams(params));
if (verifyStress) {
comparePartialResponses(rsp, controlRsp, upShards);
}
} catch (SolrServerException e) {
throw new RuntimeException(e);
}
}
}
};
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
}
}
protected QueryResponse queryRandomUpServer(ModifiableSolrParams params, List<SolrServer> upClients) throws SolrServerException {
// query a random "up" server
int which = r.nextInt(upClients.size());
SolrServer client = upClients.get(which);
QueryResponse rsp = client.query(params);
return rsp;
}
protected void comparePartialResponses(QueryResponse rsp, QueryResponse controlRsp, List<String> upShards)
{
NamedList<?> sinfo = (NamedList<?>) rsp.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("missing shard info", sinfo);
assertEquals("should have an entry for each shard ["+sinfo+"] "+shards, shardsArr.length, sinfo.size());
// identify each one
for (Map.Entry<String,?> entry : sinfo) {
String shard = entry.getKey();
NamedList<?> info = (NamedList<?>) entry.getValue();
boolean found = false;
for(int i=0; i<shardsArr.length; i++) {
String s = shardsArr[i];
if (shard.contains(s)) {
found = true;
// make sure that it responded if it's up
if (upShards.contains(s)) {
assertTrue("Expected to find numFound in the up shard info",info.get("numFound") != null);
}
else {
assertTrue("Expected to find error in the down shard info",info.get("error") != null);
}
}
}
assertTrue("Couldn't find shard " + shard + " represented in shards info", found);
}
}
}