SOLR-5468: report replication factor that was achieved for an update request if requested by the client application.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1596092 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2014-05-19 23:04:55 +00:00
parent c3692aa73d
commit 195317786c
8 changed files with 688 additions and 25 deletions

View File

@ -18,11 +18,15 @@ package org.apache.solr.update;
*/ */
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.http.HttpResponse;
import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
@ -33,6 +37,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics; import org.apache.solr.core.Diagnostics;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplicationTracker;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -168,16 +173,20 @@ public class SolrCmdDistributor {
} }
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException { public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
distribAdd(cmd, nodes, params, false); distribAdd(cmd, nodes, params, false, null);
} }
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException { public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
distribAdd(cmd, nodes, params, synchronous, null);
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {
for (Node node : nodes) { for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest(); UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params); uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
submit(new Req(cmd.toString(), node, uReq, synchronous)); submit(new Req(cmd.toString(), node, uReq, synchronous, rrt));
} }
} }
@ -233,7 +242,7 @@ public class SolrCmdDistributor {
try { try {
SolrServer solrServer = servers.getSolrServer(req); SolrServer solrServer = servers.getSolrServer(req);
NamedList<Object> rsp = solrServer.request(req.uReq); solrServer.request(req.uReq);
} catch (Exception e) { } catch (Exception e) {
SolrException.log(log, e); SolrException.log(log, e);
Error error = new Error(); Error error = new Error();
@ -252,12 +261,18 @@ public class SolrCmdDistributor {
public int retries; public int retries;
public boolean synchronous; public boolean synchronous;
public String cmdString; public String cmdString;
public RequestReplicationTracker rfTracker;
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) { public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
this(cmdString, node, uReq, synchronous, null);
}
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
this.node = node; this.node = node;
this.uReq = uReq; this.uReq = uReq;
this.synchronous = synchronous; this.synchronous = synchronous;
this.cmdString = cmdString; this.cmdString = cmdString;
this.rfTracker = rfTracker;
} }
public String toString() { public String toString() {
@ -266,6 +281,37 @@ public class SolrCmdDistributor {
sb.append("; node=").append(String.valueOf(node)); sb.append("; node=").append(String.valueOf(node));
return sb.toString(); return sb.toString();
} }
public void trackRequestResult(HttpResponse resp, boolean success) {
if (rfTracker != null) {
Integer rf = null;
if (resp != null) {
// need to parse out the rf from requests that were forwards to another leader
InputStream inputStream = null;
try {
inputStream = resp.getEntity().getContent();
BinaryResponseParser brp = new BinaryResponseParser();
NamedList<Object> nl= brp.processResponse(inputStream, null);
Object hdr = nl.get("responseHeader");
if (hdr != null && hdr instanceof NamedList) {
NamedList<Object> hdrList = (NamedList<Object>)hdr;
Object rfObj = hdrList.get(UpdateRequest.REPFACT);
if (rfObj != null && rfObj instanceof Integer) {
rf = (Integer)rfObj;
}
}
} catch (Exception e) {
log.warn("Failed to parse response from "+node+" during replication factor accounting due to: "+e);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception ignore){}
}
}
}
rfTracker.trackRequestResult(node, success, rf);
}
}
} }
@ -296,6 +342,8 @@ public class SolrCmdDistributor {
public abstract String getCoreName(); public abstract String getCoreName();
public abstract String getBaseUrl(); public abstract String getBaseUrl();
public abstract ZkCoreNodeProps getNodeProps(); public abstract ZkCoreNodeProps getNodeProps();
public abstract String getCollection();
public abstract String getShardId();
} }
public static class StdNode extends Node { public static class StdNode extends Node {

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter; import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
@ -69,6 +70,7 @@ public class StreamingSolrServers {
server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor, true) { server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor, true) {
@Override @Override
public void handleError(Throwable ex) { public void handleError(Throwable ex) {
req.trackRequestResult(null, false);
log.error("error", ex); log.error("error", ex);
Error error = new Error(); Error error = new Error();
error.e = (Exception) ex; error.e = (Exception) ex;
@ -78,6 +80,10 @@ public class StreamingSolrServers {
error.req = req; error.req = req;
errors.add(error); errors.add(error);
} }
@Override
public void onSuccess(HttpResponse resp) {
req.trackRequestResult(resp, true);
}
}; };
server.setParser(new BinaryResponseParser()); server.setParser(new BinaryResponseParser());
server.setRequestWriter(new BinaryRequestWriter()); server.setRequestWriter(new BinaryRequestWriter());

View File

@ -20,12 +20,11 @@ package org.apache.solr.update.processor;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -33,14 +32,12 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.http.NoHttpResponseException;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef; import org.apache.lucene.util.CharsRef;
import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.LeaderInitiatedRecoveryThread; import org.apache.solr.cloud.LeaderInitiatedRecoveryThread;
@ -62,7 +59,6 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
@ -71,7 +67,6 @@ import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo; import org.apache.solr.request.SolrRequestInfo;
@ -129,6 +124,102 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} }
} }
/**
* Keeps track of the replication factor achieved for a distributed update request
* originated in this distributed update processor.
*/
public static class RequestReplicationTracker {
int minRf;
// if a leader is driving the update request, then this will be non-null
// however a replica may also be driving the update request (forwards to leaders)
// in which case we leave this as null so we only count the rf back from the leaders
String onLeaderShardId;
// track number of nodes we sent requests to and how many resulted in errors
// there may be multiple requests per node when processing a batch
Map<String,AtomicInteger> nodeErrorTracker;
// if not using DirectUpdates, a leader may end up forwarding to other
// leaders, so we need to keep the achieved rf for each of those too
Map<String,Integer> otherLeaderRf;
private RequestReplicationTracker(String shardId, int minRf) {
this.minRf = minRf;
this.onLeaderShardId = shardId;
this.nodeErrorTracker = new HashMap<>(5);
this.otherLeaderRf = new HashMap<>();
}
// gives the replication factor that was achieved for this request
public int getAchievedRf() {
// look across all shards to find the minimum achieved replication
// factor ... unless the client is using direct updates from CloudSolrServer
// there may be multiple shards at play here
int achievedRf = 1;
if (onLeaderShardId != null) {
synchronized (nodeErrorTracker) {
for (AtomicInteger nodeErrors : nodeErrorTracker.values()) {
if (nodeErrors.get() == 0)
++achievedRf;
}
}
} else {
// the node driving this updateRequest is not a leader and so
// it only forwards to other leaders, so its local result doesn't count
achievedRf = Integer.MAX_VALUE;
}
// min achieved may come from a request to another leader
synchronized (otherLeaderRf) {
for (Integer otherRf : otherLeaderRf.values()) {
if (otherRf < achievedRf)
achievedRf = otherRf;
}
}
return (achievedRf == Integer.MAX_VALUE) ? 1 : achievedRf;
}
public void trackRequestResult(Node node, boolean success, Integer rf) {
String shardId = node.getShardId();
if (log.isDebugEnabled())
log.debug("trackRequestResult("+node+"): success? "+success+" rf="+rf+
", shardId="+shardId+" onLeaderShardId="+onLeaderShardId);
if (onLeaderShardId == null || !onLeaderShardId.equals(shardId)) {
// result from another leader that we forwarded to
synchronized (otherLeaderRf) {
otherLeaderRf.put(shardId, rf != null ? rf : new Integer(1));
}
return;
}
if (onLeaderShardId != null) {
// track result for this leader
String nodeUrl = node.getUrl();
AtomicInteger nodeErrors = null;
// potentially many results flooding into this method from multiple nodes concurrently
synchronized (nodeErrorTracker) {
nodeErrors = nodeErrorTracker.get(nodeUrl);
if (nodeErrors == null) {
nodeErrors = new AtomicInteger(0);
nodeErrorTracker.put(nodeUrl, nodeErrors);
}
}
if (!success)
nodeErrors.incrementAndGet();
}
}
public String toString() {
StringBuilder sb = new StringBuilder("RequestReplicationTracker");
sb.append(": onLeaderShardId=").append(String.valueOf(onLeaderShardId));
sb.append(", minRf=").append(minRf);
sb.append(", achievedRf=").append(getAchievedRf());
return sb.toString();
}
}
public static final String COMMIT_END_POINT = "commit_end_point"; public static final String COMMIT_END_POINT = "commit_end_point";
public static final String LOG_REPLAY = "log_replay"; public static final String LOG_REPLAY = "log_replay";
@ -168,6 +259,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private UpdateCommand updateCommand; // the current command this processor is working on. private UpdateCommand updateCommand; // the current command this processor is working on.
//used for keeping track of replicas that have processed an add/update from the leader
private RequestReplicationTracker replicationTracker = null;
public DistributedUpdateProcessor(SolrQueryRequest req, public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) { SolrQueryResponse rsp, UpdateRequestProcessor next) {
super(next); super(next);
@ -560,6 +654,40 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
isLeader = getNonZkLeaderAssumption(req); isLeader = getNonZkLeaderAssumption(req);
} }
// check if client has requested minimum replication factor information
int minRf = -1; // disabled by default
if (replicationTracker != null) {
minRf = replicationTracker.minRf; // for subsequent requests in the same batch
} else {
SolrParams rp = cmd.getReq().getParams();
String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM);
// somewhat tricky logic here: we only activate the replication tracker if we're on
// a leader or this is the top-level request processor
if (distribUpdate == null || distribUpdate.equals(DistribPhase.TOLEADER.toString())) {
String minRepFact = rp.get(UpdateRequest.MIN_REPFACT);
if (minRepFact != null) {
try {
minRf = Integer.parseInt(minRepFact);
} catch (NumberFormatException nfe) {
minRf = -1;
}
if (minRf <= 0)
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid value "+minRepFact+" for "+UpdateRequest.MIN_REPFACT+
"; must be >0 and less than or equal to the collection replication factor.");
}
if (minRf > 1) {
String myShardId = forwardToLeader ? null : cloudDesc.getShardId();
replicationTracker = new RequestReplicationTracker(myShardId, minRf);
}
}
}
// TODO: if minRf > 1 and we know the leader is the only active replica, we could fail
// the request right here but for now I think it is better to just return the status
// to the client that the minRf wasn't reached and let them handle it
boolean dropCmd = false; boolean dropCmd = false;
if (!forwardToLeader) { if (!forwardToLeader) {
dropCmd = versionAdd(cmd); dropCmd = versionAdd(cmd);
@ -593,6 +721,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node nodesByRoutingRule : nodesByRoutingRules) { for (Node nodesByRoutingRule : nodesByRoutingRules) {
cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true); cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true);
} }
@ -601,7 +730,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ModifiableSolrParams params = null; ModifiableSolrParams params = null;
if (nodes != null) { if (nodes != null) {
params = new ModifiableSolrParams(filterParams(req.getParams())); params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, params.set(DISTRIB_UPDATE_PARAM,
(isLeader || isSubShardLeader ? (isLeader || isSubShardLeader ?
@ -609,7 +737,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
DistribPhase.TOLEADER.toString())); DistribPhase.TOLEADER.toString()));
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName())); zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribAdd(cmd, nodes, params);
if (replicationTracker != null && minRf > 1)
params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
} }
// TODO: what to do when no idField? // TODO: what to do when no idField?
@ -724,6 +856,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor(); ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
executor.execute(lirThread); executor.execute(lirThread);
} }
if (replicationTracker != null) {
rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf());
rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf);
replicationTracker = null;
}
} }

View File

@ -62,7 +62,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
private static final long sleepMsBeforeHealPartition = 1000L; private static final long sleepMsBeforeHealPartition = 1000L;
private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>(); private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
private AtomicInteger portCounter = new AtomicInteger(0);
public HttpPartitionTest() { public HttpPartitionTest() {
super(); super();

View File

@ -0,0 +1,379 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.net.ServerSocket;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests a client application's ability to get replication factor
* information back from the cluster after an add or update.
*/
@Slow
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
private static final transient Logger log =
LoggerFactory.getLogger(ReplicationFactorTest.class);
private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
public ReplicationFactorTest() {
super();
sliceCount = 3;
shardCount = 3;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", Integer.toString(sliceCount));
}
@Override
@After
public void tearDown() throws Exception {
if (!proxies.isEmpty()) {
for (SocketProxy proxy : proxies.values()) {
proxy.close();
}
}
System.clearProperty("numShards");
try {
super.tearDown();
} catch (Exception exc) {}
resetExceptionIgnores();
}
/**
* Overrides the parent implementation so that we can configure a socket proxy
* to sit infront of each Jetty server, which gives us the ability to simulate
* network partitions without having to fuss with IPTables (which is not very
* cross platform friendly).
*/
@Override
public JettySolrRunner createJetty(File solrHome, String dataDir,
String shardList, String solrConfigOverride, String schemaOverride)
throws Exception {
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
0, solrConfigOverride, schemaOverride, false,
getExtraServlets(), sslConfig, getExtraRequestFilters());
jetty.setShards(shardList);
jetty.setDataDir(getDataDir(dataDir));
// setup to proxy Http requests to this server unless it is the control
// server
int proxyPort = getNextAvailablePort();
jetty.setProxyPort(proxyPort);
jetty.start();
// create a socket proxy for the jetty server ...
SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
proxies.put(proxy.getUrl(), proxy);
return jetty;
}
protected int getNextAvailablePort() throws Exception {
int port = -1;
try (ServerSocket s = new ServerSocket(0)) {
port = s.getLocalPort();
}
return port;
}
@Override
public void doTest() throws Exception {
waitForThingsToLevelOut(30000);
// test a 1x3 collection
testRf3();
// test handling when not using direct updates
testRf2NotUsingDirectUpdates();
}
protected void testRf2NotUsingDirectUpdates() throws Exception {
int numShards = 2;
int replicationFactor = 2;
int maxShardsPerNode = 1;
String testCollectionName = "c8n_2x2";
String shardId = "shard1";
int minRf = 2;
createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
cloudClient.setDefaultCollection(testCollectionName);
List<Replica> replicas =
ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor, 10);
assertTrue("Expected active 1 replicas for "+testCollectionName, replicas.size() == 1);
List<SolrInputDocument> batch = new ArrayList<SolrInputDocument>(10);
for (int i=0; i < 15; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(i));
doc.addField("a_t", "hello" + i);
batch.add(doc);
}
// send directly to the leader using HttpSolrServer instead of CloudSolrServer (to test support for non-direct updates)
UpdateRequest up = new UpdateRequest();
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
up.add(batch);
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
sendNonDirectUpdateRequestReplica(leader, up, 2, testCollectionName);
sendNonDirectUpdateRequestReplica(replicas.get(0), up, 2, testCollectionName);
// so now kill the replica of shard2 and verify the achieved rf is only 1
List<Replica> shard2Replicas =
ensureAllReplicasAreActive(testCollectionName, "shard2", numShards, replicationFactor, 10);
assertTrue("Expected active 1 replicas for "+testCollectionName, replicas.size() == 1);
getProxyForReplica(shard2Replicas.get(0)).close();
Thread.sleep(2000);
// shard1 will have rf=2 but shard2 will only have rf=1
sendNonDirectUpdateRequestReplica(leader, up, 1, testCollectionName);
sendNonDirectUpdateRequestReplica(replicas.get(0), up, 1, testCollectionName);
}
@SuppressWarnings("rawtypes")
protected void sendNonDirectUpdateRequestReplica(Replica replica, UpdateRequest up, int expectedRf, String collection) throws Exception {
HttpSolrServer solrServer = null;
try {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
String url = zkProps.getBaseUrl() + "/" + collection;
solrServer = new HttpSolrServer(url);
NamedList resp = solrServer.request(up);
NamedList hdr = (NamedList) resp.get("responseHeader");
Integer batchRf = (Integer)hdr.get(UpdateRequest.REPFACT);
assertTrue("Expected rf="+expectedRf+" for batch but got "+batchRf, batchRf == expectedRf);
} finally {
if (solrServer != null)
solrServer.shutdown();
}
}
protected void testRf3() throws Exception {
int numShards = 1;
int replicationFactor = 3;
int maxShardsPerNode = 1;
String testCollectionName = "c8n_1x3";
String shardId = "shard1";
int minRf = 2;
createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
cloudClient.setDefaultCollection(testCollectionName);
List<Replica> replicas =
ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor, 10);
assertTrue("Expected active 2 replicas for "+testCollectionName, replicas.size() == 2);
int rf = sendDoc(1, minRf);
assertTrue("Expected rf=3 as all replicas are up, but got "+rf, rf == 3);
getProxyForReplica(replicas.get(0)).close();
rf = sendDoc(2, minRf);
assertTrue("Expected rf=2 as one replica should be down, but got "+rf, rf == 2);
getProxyForReplica(replicas.get(1)).close();
rf = sendDoc(3, minRf);
assertTrue("Expected rf=1 as both replicas should be down, but got "+rf, rf == 1);
// heal the partitions
getProxyForReplica(replicas.get(0)).reopen();
getProxyForReplica(replicas.get(1)).reopen();
rf = sendDoc(4, minRf);
assertTrue("Expected rf=3 as partitions to replicas have been healed, but got "+rf, rf == 3);
// now send a batch
List<SolrInputDocument> batch = new ArrayList<SolrInputDocument>(10);
for (int i=5; i < 15; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(i));
doc.addField("a_t", "hello" + i);
batch.add(doc);
}
UpdateRequest up = new UpdateRequest();
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
up.add(batch);
int batchRf =
cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
assertTrue("Expected rf=3 for batch but got "+batchRf, batchRf == 3);
// add some chaos to the batch
getProxyForReplica(replicas.get(0)).close();
// now send a batch
batch = new ArrayList<SolrInputDocument>(10);
for (int i=15; i < 30; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(i));
doc.addField("a_t", "hello" + i);
batch.add(doc);
}
up = new UpdateRequest();
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
up.add(batch);
batchRf =
cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
assertTrue("Expected rf=2 for batch (one replica is down) but got "+batchRf, batchRf == 2);
// close the 2nd replica, and send a 3rd batch with expected achieved rf=1
getProxyForReplica(replicas.get(1)).close();
batch = new ArrayList<SolrInputDocument>(10);
for (int i=30; i < 45; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(i));
doc.addField("a_t", "hello" + i);
batch.add(doc);
}
up = new UpdateRequest();
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
up.add(batch);
batchRf =
cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
assertTrue("Expected rf=1 for batch (two replicas are down) but got "+batchRf, batchRf == 1);
getProxyForReplica(replicas.get(0)).reopen();
getProxyForReplica(replicas.get(1)).reopen();
Thread.sleep(1000);
}
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
SocketProxy proxy = proxies.get(baseUrl.toURI());
if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
baseUrl = new URL(baseUrl.toExternalForm() + "/");
proxy = proxies.get(baseUrl.toURI());
}
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
protected int sendDoc(int docId, int minRf) throws Exception {
UpdateRequest up = new UpdateRequest();
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(docId));
doc.addField("a_t", "hello" + docId);
up.add(doc);
return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
}
protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception {
long startMs = System.currentTimeMillis();
Map<String,Replica> notLeaders = new HashMap<String,Replica>();
ZkStateReader zkr = cloudClient.getZkStateReader();
ClusterState cs = zkr.getClusterState();
Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
assertTrue(slices.size() == shards);
boolean allReplicasUp = false;
long waitMs = 0L;
long maxWaitMs = maxWaitSecs * 1000L;
Replica leader = null;
while (waitMs < maxWaitMs && !allReplicasUp) {
cs = zkr.getClusterState();
assertNotNull(cs);
Slice shard = cs.getSlice(testCollectionName, shardId);
assertNotNull("No Slice for "+shardId, shard);
allReplicasUp = true; // assume true
Collection<Replica> replicas = shard.getReplicas();
assertTrue(replicas.size() == rf);
leader = shard.getLeader();
assertNotNull(leader);
// ensure all replicas are "active" and identify the non-leader replica
for (Replica replica : replicas) {
String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
if (!ZkStateReader.ACTIVE.equals(replicaState)) {
log.info("Replica " + replica.getName() + " is currently " + replicaState);
allReplicasUp = false;
}
if (!leader.equals(replica))
notLeaders.put(replica.getName(), replica);
}
if (!allReplicasUp) {
try {
Thread.sleep(500L);
} catch (Exception ignoreMe) {}
waitMs += 500L;
}
} // end while
if (!allReplicasUp)
fail("Didn't see all replicas come up within " + maxWaitMs + " ms! ClusterState: " + cs);
if (notLeaders.isEmpty())
fail("Didn't isolate any replicas that are not the leader! ClusterState: " + cs);
long diffMs = (System.currentTimeMillis() - startMs);
log.info("Took " + diffMs + " ms to see all replicas become active.");
List<Replica> replicas = new ArrayList<Replica>();
replicas.addAll(notLeaders.values());
return replicas;
}
}

View File

@ -436,6 +436,8 @@ public class CloudSolrServer extends SolrServer {
public RouteResponse condenseResponse(NamedList response, long timeMillis) { public RouteResponse condenseResponse(NamedList response, long timeMillis) {
RouteResponse condensed = new RouteResponse(); RouteResponse condensed = new RouteResponse();
int status = 0; int status = 0;
Integer rf = null;
Integer minRf = null;
for(int i=0; i<response.size(); i++) { for(int i=0; i<response.size(); i++) {
NamedList shardResponse = (NamedList)response.getVal(i); NamedList shardResponse = (NamedList)response.getVal(i);
NamedList header = (NamedList)shardResponse.get("responseHeader"); NamedList header = (NamedList)shardResponse.get("responseHeader");
@ -444,11 +446,23 @@ public class CloudSolrServer extends SolrServer {
if(s > 0) { if(s > 0) {
status = s; status = s;
} }
Object rfObj = header.get(UpdateRequest.REPFACT);
if (rfObj != null && rfObj instanceof Integer) {
Integer routeRf = (Integer)rfObj;
if (rf == null || routeRf < rf)
rf = routeRf;
}
minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
} }
NamedList cheader = new NamedList(); NamedList cheader = new NamedList();
cheader.add("status", status); cheader.add("status", status);
cheader.add("QTime", timeMillis); cheader.add("QTime", timeMillis);
if (rf != null)
cheader.add(UpdateRequest.REPFACT, rf);
if (minRf != null)
cheader.add(UpdateRequest.MIN_REPFACT, minRf);
condensed.add("responseHeader", cheader); condensed.add("responseHeader", cheader);
return condensed; return condensed;
} }
@ -692,4 +706,73 @@ public class CloudSolrServer extends SolrServer {
return updatesToLeaders; return updatesToLeaders;
} }
/**
* Useful for determining the minimum achieved replication factor across
* all shards involved in processing an update request, typically useful
* for gauging the replication factor of a batch.
*/
@SuppressWarnings("rawtypes")
public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
// it's probably already on the top-level header set by condense
NamedList header = (NamedList)resp.get("responseHeader");
Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
if (achRf != null)
return achRf.intValue();
// not on the top-level header, walk the shard route tree
Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
for (Integer rf : shardRf.values()) {
if (achRf == null || rf < achRf) {
achRf = rf;
}
}
return (achRf != null) ? achRf.intValue() : -1;
}
/**
* Walks the NamedList response after performing an update request looking for
* the replication factor that was achieved in each shard involved in the request.
* For single doc updates, there will be only one shard in the return value.
*/
@SuppressWarnings("rawtypes")
public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
connect();
Map<String,Integer> results = new HashMap<String,Integer>();
if (resp instanceof CloudSolrServer.RouteResponse) {
NamedList routes = ((CloudSolrServer.RouteResponse)resp).getRouteResponses();
ClusterState clusterState = zkStateReader.getClusterState();
Map<String,String> leaders = new HashMap<String,String>();
for (Slice slice : clusterState.getActiveSlices(collection)) {
Replica leader = slice.getLeader();
if (leader != null) {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
leaders.put(leaderUrl, slice.getName());
String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
leaders.put(altLeaderUrl, slice.getName());
}
}
Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
while (routeIter.hasNext()) {
Map.Entry<String,Object> next = routeIter.next();
String host = next.getKey();
NamedList hostResp = (NamedList)next.getValue();
Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
if (rf != null) {
String shard = leaders.get(host);
if (shard == null) {
if (host.endsWith("/"))
shard = leaders.get(host.substring(0,host.length()-1));
if (shard == null) {
shard = host;
}
}
results.put(shard, rf);
}
}
}
return results;
}
} }

View File

@ -239,6 +239,8 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
msg.append("\n\n"); msg.append("\n\n");
msg.append("request: ").append(method.getURI()); msg.append("request: ").append(method.getURI());
handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString())); handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()));
} else {
onSuccess(response);
} }
} finally { } finally {
try { try {
@ -406,6 +408,13 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
log.error("error", ex); log.error("error", ex);
} }
/**
* Intended to be used as an extension point for doing post processing after a request completes.
*/
public void onSuccess(HttpResponse resp) {
// no-op by design, override to add functionality
}
@Override @Override
public void shutdown() { public void shutdown() {
server.shutdown(); server.shutdown();

View File

@ -46,7 +46,8 @@ import org.apache.solr.common.util.XML;
* @since solr 1.3 * @since solr 1.3
*/ */
public class UpdateRequest extends AbstractUpdateRequest { public class UpdateRequest extends AbstractUpdateRequest {
public static final String REPFACT = "rf";
public static final String MIN_REPFACT = "min_rf";
public static final String VER = "ver"; public static final String VER = "ver";
public static final String OVERWRITE = "ow"; public static final String OVERWRITE = "ow";
public static final String COMMIT_WITHIN = "cw"; public static final String COMMIT_WITHIN = "cw";