diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 26ea68a52ec..5e8a57acd86 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -18,11 +18,15 @@ package org.apache.solr.update; */ import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.util.ArrayList; import java.util.List; + +import org.apache.http.HttpResponse; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.BinaryResponseParser; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -33,6 +37,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.Diagnostics; +import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplicationTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +68,7 @@ public class SolrCmdDistributor { this.retryPause = retryPause; } - public void finish() { + public void finish() { try { servers.blockUntilFinished(); doRetriesIfNeeded(); @@ -168,16 +173,20 @@ public class SolrCmdDistributor { } public void distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams params) throws IOException { - distribAdd(cmd, nodes, params, false); + distribAdd(cmd, nodes, params, false, null); + } + + public void distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams params, boolean synchronous) throws IOException { + distribAdd(cmd, nodes, params, synchronous, null); } - public void distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams params, boolean synchronous) throws IOException { + public void distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException { for (Node node : nodes) { UpdateRequest uReq = new UpdateRequest(); uReq.setParams(params); - uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); - submit(new Req(cmd.toString(), node, uReq, synchronous)); + uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); + submit(new Req(cmd.toString(), node, uReq, synchronous, rrt)); } } @@ -233,7 +242,7 @@ public class SolrCmdDistributor { try { SolrServer solrServer = servers.getSolrServer(req); - NamedList rsp = solrServer.request(req.uReq); + solrServer.request(req.uReq); } catch (Exception e) { SolrException.log(log, e); Error error = new Error(); @@ -252,12 +261,18 @@ public class SolrCmdDistributor { public int retries; public boolean synchronous; public String cmdString; - + public RequestReplicationTracker rfTracker; + public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) { + this(cmdString, node, uReq, synchronous, null); + } + + public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) { this.node = node; this.uReq = uReq; this.synchronous = synchronous; this.cmdString = cmdString; + this.rfTracker = rfTracker; } public String toString() { @@ -266,6 +281,37 @@ public class SolrCmdDistributor { sb.append("; node=").append(String.valueOf(node)); return sb.toString(); } + public void trackRequestResult(HttpResponse resp, boolean success) { + if (rfTracker != null) { + Integer rf = null; + if (resp != null) { + // need to parse out the rf from requests that were forwards to another leader + InputStream inputStream = null; + try { + inputStream = resp.getEntity().getContent(); + BinaryResponseParser brp = new BinaryResponseParser(); + NamedList nl= brp.processResponse(inputStream, null); + Object hdr = nl.get("responseHeader"); + if (hdr != null && hdr instanceof NamedList) { + NamedList hdrList = (NamedList)hdr; + Object rfObj = hdrList.get(UpdateRequest.REPFACT); + if (rfObj != null && rfObj instanceof Integer) { + rf = (Integer)rfObj; + } + } + } catch (Exception e) { + log.warn("Failed to parse response from "+node+" during replication factor accounting due to: "+e); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (Exception ignore){} + } + } + } + rfTracker.trackRequestResult(node, success, rf); + } + } } @@ -296,6 +342,8 @@ public class SolrCmdDistributor { public abstract String getCoreName(); public abstract String getBaseUrl(); public abstract ZkCoreNodeProps getNodeProps(); + public abstract String getCollection(); + public abstract String getShardId(); } public static class StdNode extends Node { diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java index 261dd427162..de4999013b7 100644 --- a/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java +++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.impl.BinaryRequestWriter; @@ -69,6 +70,7 @@ public class StreamingSolrServers { server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor, true) { @Override public void handleError(Throwable ex) { + req.trackRequestResult(null, false); log.error("error", ex); Error error = new Error(); error.e = (Exception) ex; @@ -78,6 +80,10 @@ public class StreamingSolrServers { error.req = req; errors.add(error); } + @Override + public void onSuccess(HttpResponse resp) { + req.trackRequestResult(resp, true); + } }; server.setParser(new BinaryResponseParser()); server.setRequestWriter(new BinaryRequestWriter()); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 79bf83d34a2..3259cedffda 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -20,12 +20,11 @@ package org.apache.solr.update.processor; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -33,14 +32,12 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; -import org.apache.http.NoHttpResponseException; -import org.apache.http.conn.ConnectTimeoutException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CharsRef; -import org.apache.solr.client.solrj.impl.HttpSolrServer; -import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.LeaderInitiatedRecoveryThread; @@ -62,7 +59,6 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; -import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; @@ -71,7 +67,6 @@ import org.apache.solr.common.util.Hash; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; -import org.apache.solr.core.SolrCore; import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestInfo; @@ -128,7 +123,103 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } } + + /** + * Keeps track of the replication factor achieved for a distributed update request + * originated in this distributed update processor. + */ + public static class RequestReplicationTracker { + int minRf; + // if a leader is driving the update request, then this will be non-null + // however a replica may also be driving the update request (forwards to leaders) + // in which case we leave this as null so we only count the rf back from the leaders + String onLeaderShardId; + // track number of nodes we sent requests to and how many resulted in errors + // there may be multiple requests per node when processing a batch + Map nodeErrorTracker; + // if not using DirectUpdates, a leader may end up forwarding to other + // leaders, so we need to keep the achieved rf for each of those too + Map otherLeaderRf; + + private RequestReplicationTracker(String shardId, int minRf) { + this.minRf = minRf; + this.onLeaderShardId = shardId; + this.nodeErrorTracker = new HashMap<>(5); + this.otherLeaderRf = new HashMap<>(); + } + + // gives the replication factor that was achieved for this request + public int getAchievedRf() { + // look across all shards to find the minimum achieved replication + // factor ... unless the client is using direct updates from CloudSolrServer + // there may be multiple shards at play here + int achievedRf = 1; + if (onLeaderShardId != null) { + synchronized (nodeErrorTracker) { + for (AtomicInteger nodeErrors : nodeErrorTracker.values()) { + if (nodeErrors.get() == 0) + ++achievedRf; + } + } + } else { + // the node driving this updateRequest is not a leader and so + // it only forwards to other leaders, so its local result doesn't count + achievedRf = Integer.MAX_VALUE; + } + + // min achieved may come from a request to another leader + synchronized (otherLeaderRf) { + for (Integer otherRf : otherLeaderRf.values()) { + if (otherRf < achievedRf) + achievedRf = otherRf; + } + } + + return (achievedRf == Integer.MAX_VALUE) ? 1 : achievedRf; + } + + public void trackRequestResult(Node node, boolean success, Integer rf) { + String shardId = node.getShardId(); + if (log.isDebugEnabled()) + log.debug("trackRequestResult("+node+"): success? "+success+" rf="+rf+ + ", shardId="+shardId+" onLeaderShardId="+onLeaderShardId); + + if (onLeaderShardId == null || !onLeaderShardId.equals(shardId)) { + // result from another leader that we forwarded to + synchronized (otherLeaderRf) { + otherLeaderRf.put(shardId, rf != null ? rf : new Integer(1)); + } + return; + } + + if (onLeaderShardId != null) { + // track result for this leader + String nodeUrl = node.getUrl(); + AtomicInteger nodeErrors = null; + // potentially many results flooding into this method from multiple nodes concurrently + synchronized (nodeErrorTracker) { + nodeErrors = nodeErrorTracker.get(nodeUrl); + if (nodeErrors == null) { + nodeErrors = new AtomicInteger(0); + nodeErrorTracker.put(nodeUrl, nodeErrors); + } + } + + if (!success) + nodeErrors.incrementAndGet(); + } + } + + public String toString() { + StringBuilder sb = new StringBuilder("RequestReplicationTracker"); + sb.append(": onLeaderShardId=").append(String.valueOf(onLeaderShardId)); + sb.append(", minRf=").append(minRf); + sb.append(", achievedRf=").append(getAchievedRf()); + return sb.toString(); + } + } + public static final String COMMIT_END_POINT = "commit_end_point"; public static final String LOG_REPLAY = "log_replay"; @@ -168,6 +259,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private UpdateCommand updateCommand; // the current command this processor is working on. + //used for keeping track of replicas that have processed an add/update from the leader + private RequestReplicationTracker replicationTracker = null; + public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { super(next); @@ -559,7 +653,41 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } else { isLeader = getNonZkLeaderAssumption(req); } - + + // check if client has requested minimum replication factor information + int minRf = -1; // disabled by default + if (replicationTracker != null) { + minRf = replicationTracker.minRf; // for subsequent requests in the same batch + } else { + SolrParams rp = cmd.getReq().getParams(); + String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM); + // somewhat tricky logic here: we only activate the replication tracker if we're on + // a leader or this is the top-level request processor + if (distribUpdate == null || distribUpdate.equals(DistribPhase.TOLEADER.toString())) { + String minRepFact = rp.get(UpdateRequest.MIN_REPFACT); + if (minRepFact != null) { + try { + minRf = Integer.parseInt(minRepFact); + } catch (NumberFormatException nfe) { + minRf = -1; + } + + if (minRf <= 0) + throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid value "+minRepFact+" for "+UpdateRequest.MIN_REPFACT+ + "; must be >0 and less than or equal to the collection replication factor."); + } + + if (minRf > 1) { + String myShardId = forwardToLeader ? null : cloudDesc.getShardId(); + replicationTracker = new RequestReplicationTracker(myShardId, minRf); + } + } + } + + // TODO: if minRf > 1 and we know the leader is the only active replica, we could fail + // the request right here but for now I think it is better to just return the status + // to the client that the minRf wasn't reached and let them handle it + boolean dropCmd = false; if (!forwardToLeader) { dropCmd = versionAdd(cmd); @@ -593,15 +721,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); + for (Node nodesByRoutingRule : nodesByRoutingRules) { cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true); } } } - + ModifiableSolrParams params = null; if (nodes != null) { - params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? @@ -609,7 +737,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { DistribPhase.TOLEADER.toString())); params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); - cmdDistrib.distribAdd(cmd, nodes, params); + + if (replicationTracker != null && minRf > 1) + params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); + + cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker); } // TODO: what to do when no idField? @@ -632,9 +764,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // TODO: optionally fail if n replicas are not reached... private void doFinish() { // TODO: if not a forward and replication req is not specified, we could - // send in a background thread - - cmdDistrib.finish(); + // send in a background thread + + cmdDistrib.finish(); List errors = cmdDistrib.getErrors(); // TODO - we may need to tell about more than one error... @@ -724,6 +856,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor(); executor.execute(lirThread); } + + if (replicationTracker != null) { + rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf()); + rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf); + replicationTracker = null; + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index cef27319e53..efb9cbbf810 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -62,7 +62,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { private static final long sleepMsBeforeHealPartition = 1000L; private Map proxies = new HashMap(); - private AtomicInteger portCounter = new AtomicInteger(0); public HttpPartitionTest() { super(); diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java new file mode 100644 index 00000000000..359568fcafa --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java @@ -0,0 +1,379 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.net.ServerSocket; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrServer; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.NamedList; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests a client application's ability to get replication factor + * information back from the cluster after an add or update. + */ +@Slow +@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") +public class ReplicationFactorTest extends AbstractFullDistribZkTestBase { + + private static final transient Logger log = + LoggerFactory.getLogger(ReplicationFactorTest.class); + + private Map proxies = new HashMap(); + + public ReplicationFactorTest() { + super(); + sliceCount = 3; + shardCount = 3; + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + System.setProperty("numShards", Integer.toString(sliceCount)); + } + + @Override + @After + public void tearDown() throws Exception { + if (!proxies.isEmpty()) { + for (SocketProxy proxy : proxies.values()) { + proxy.close(); + } + } + + System.clearProperty("numShards"); + + try { + super.tearDown(); + } catch (Exception exc) {} + + resetExceptionIgnores(); + } + + /** + * Overrides the parent implementation so that we can configure a socket proxy + * to sit infront of each Jetty server, which gives us the ability to simulate + * network partitions without having to fuss with IPTables (which is not very + * cross platform friendly). + */ + @Override + public JettySolrRunner createJetty(File solrHome, String dataDir, + String shardList, String solrConfigOverride, String schemaOverride) + throws Exception { + + JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context, + 0, solrConfigOverride, schemaOverride, false, + getExtraServlets(), sslConfig, getExtraRequestFilters()); + jetty.setShards(shardList); + jetty.setDataDir(getDataDir(dataDir)); + + // setup to proxy Http requests to this server unless it is the control + // server + int proxyPort = getNextAvailablePort(); + jetty.setProxyPort(proxyPort); + + jetty.start(); + + // create a socket proxy for the jetty server ... + SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI()); + proxies.put(proxy.getUrl(), proxy); + + return jetty; + } + + protected int getNextAvailablePort() throws Exception { + int port = -1; + try (ServerSocket s = new ServerSocket(0)) { + port = s.getLocalPort(); + } + return port; + } + + @Override + public void doTest() throws Exception { + waitForThingsToLevelOut(30000); + + // test a 1x3 collection + testRf3(); + + // test handling when not using direct updates + testRf2NotUsingDirectUpdates(); + } + + protected void testRf2NotUsingDirectUpdates() throws Exception { + int numShards = 2; + int replicationFactor = 2; + int maxShardsPerNode = 1; + String testCollectionName = "c8n_2x2"; + String shardId = "shard1"; + int minRf = 2; + + createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); + cloudClient.setDefaultCollection(testCollectionName); + + List replicas = + ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor, 10); + assertTrue("Expected active 1 replicas for "+testCollectionName, replicas.size() == 1); + + List batch = new ArrayList(10); + for (int i=0; i < 15; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(id, String.valueOf(i)); + doc.addField("a_t", "hello" + i); + batch.add(doc); + } + + // send directly to the leader using HttpSolrServer instead of CloudSolrServer (to test support for non-direct updates) + UpdateRequest up = new UpdateRequest(); + up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); + up.add(batch); + + Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId); + sendNonDirectUpdateRequestReplica(leader, up, 2, testCollectionName); + sendNonDirectUpdateRequestReplica(replicas.get(0), up, 2, testCollectionName); + + // so now kill the replica of shard2 and verify the achieved rf is only 1 + List shard2Replicas = + ensureAllReplicasAreActive(testCollectionName, "shard2", numShards, replicationFactor, 10); + assertTrue("Expected active 1 replicas for "+testCollectionName, replicas.size() == 1); + + getProxyForReplica(shard2Replicas.get(0)).close(); + + Thread.sleep(2000); + + // shard1 will have rf=2 but shard2 will only have rf=1 + sendNonDirectUpdateRequestReplica(leader, up, 1, testCollectionName); + sendNonDirectUpdateRequestReplica(replicas.get(0), up, 1, testCollectionName); + } + + @SuppressWarnings("rawtypes") + protected void sendNonDirectUpdateRequestReplica(Replica replica, UpdateRequest up, int expectedRf, String collection) throws Exception { + HttpSolrServer solrServer = null; + try { + ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica); + String url = zkProps.getBaseUrl() + "/" + collection; + solrServer = new HttpSolrServer(url); + + NamedList resp = solrServer.request(up); + NamedList hdr = (NamedList) resp.get("responseHeader"); + Integer batchRf = (Integer)hdr.get(UpdateRequest.REPFACT); + assertTrue("Expected rf="+expectedRf+" for batch but got "+batchRf, batchRf == expectedRf); + } finally { + if (solrServer != null) + solrServer.shutdown(); + } + } + + protected void testRf3() throws Exception { + int numShards = 1; + int replicationFactor = 3; + int maxShardsPerNode = 1; + String testCollectionName = "c8n_1x3"; + String shardId = "shard1"; + int minRf = 2; + + createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode); + cloudClient.setDefaultCollection(testCollectionName); + + List replicas = + ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor, 10); + assertTrue("Expected active 2 replicas for "+testCollectionName, replicas.size() == 2); + + int rf = sendDoc(1, minRf); + assertTrue("Expected rf=3 as all replicas are up, but got "+rf, rf == 3); + + getProxyForReplica(replicas.get(0)).close(); + + rf = sendDoc(2, minRf); + assertTrue("Expected rf=2 as one replica should be down, but got "+rf, rf == 2); + + getProxyForReplica(replicas.get(1)).close(); + + rf = sendDoc(3, minRf); + assertTrue("Expected rf=1 as both replicas should be down, but got "+rf, rf == 1); + + // heal the partitions + getProxyForReplica(replicas.get(0)).reopen(); + getProxyForReplica(replicas.get(1)).reopen(); + + rf = sendDoc(4, minRf); + assertTrue("Expected rf=3 as partitions to replicas have been healed, but got "+rf, rf == 3); + + // now send a batch + List batch = new ArrayList(10); + for (int i=5; i < 15; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(id, String.valueOf(i)); + doc.addField("a_t", "hello" + i); + batch.add(doc); + } + + UpdateRequest up = new UpdateRequest(); + up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); + up.add(batch); + int batchRf = + cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up)); + assertTrue("Expected rf=3 for batch but got "+batchRf, batchRf == 3); + + // add some chaos to the batch + getProxyForReplica(replicas.get(0)).close(); + + // now send a batch + batch = new ArrayList(10); + for (int i=15; i < 30; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(id, String.valueOf(i)); + doc.addField("a_t", "hello" + i); + batch.add(doc); + } + + up = new UpdateRequest(); + up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); + up.add(batch); + batchRf = + cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up)); + assertTrue("Expected rf=2 for batch (one replica is down) but got "+batchRf, batchRf == 2); + + // close the 2nd replica, and send a 3rd batch with expected achieved rf=1 + getProxyForReplica(replicas.get(1)).close(); + + batch = new ArrayList(10); + for (int i=30; i < 45; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(id, String.valueOf(i)); + doc.addField("a_t", "hello" + i); + batch.add(doc); + } + + up = new UpdateRequest(); + up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); + up.add(batch); + batchRf = + cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up)); + assertTrue("Expected rf=1 for batch (two replicas are down) but got "+batchRf, batchRf == 1); + + getProxyForReplica(replicas.get(0)).reopen(); + getProxyForReplica(replicas.get(1)).reopen(); + Thread.sleep(1000); + } + + protected SocketProxy getProxyForReplica(Replica replica) throws Exception { + String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); + assertNotNull(replicaBaseUrl); + URL baseUrl = new URL(replicaBaseUrl); + + SocketProxy proxy = proxies.get(baseUrl.toURI()); + if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) { + baseUrl = new URL(baseUrl.toExternalForm() + "/"); + proxy = proxies.get(baseUrl.toURI()); + } + assertNotNull("No proxy found for " + baseUrl + "!", proxy); + return proxy; + } + + protected int sendDoc(int docId, int minRf) throws Exception { + UpdateRequest up = new UpdateRequest(); + up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(id, String.valueOf(docId)); + doc.addField("a_t", "hello" + docId); + up.add(doc); + return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up)); + } + + protected List ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception { + long startMs = System.currentTimeMillis(); + + Map notLeaders = new HashMap(); + + ZkStateReader zkr = cloudClient.getZkStateReader(); + ClusterState cs = zkr.getClusterState(); + Collection slices = cs.getActiveSlices(testCollectionName); + assertTrue(slices.size() == shards); + boolean allReplicasUp = false; + long waitMs = 0L; + long maxWaitMs = maxWaitSecs * 1000L; + Replica leader = null; + while (waitMs < maxWaitMs && !allReplicasUp) { + cs = zkr.getClusterState(); + assertNotNull(cs); + Slice shard = cs.getSlice(testCollectionName, shardId); + assertNotNull("No Slice for "+shardId, shard); + allReplicasUp = true; // assume true + Collection replicas = shard.getReplicas(); + assertTrue(replicas.size() == rf); + leader = shard.getLeader(); + assertNotNull(leader); + + // ensure all replicas are "active" and identify the non-leader replica + for (Replica replica : replicas) { + String replicaState = replica.getStr(ZkStateReader.STATE_PROP); + if (!ZkStateReader.ACTIVE.equals(replicaState)) { + log.info("Replica " + replica.getName() + " is currently " + replicaState); + allReplicasUp = false; + } + + if (!leader.equals(replica)) + notLeaders.put(replica.getName(), replica); + } + + if (!allReplicasUp) { + try { + Thread.sleep(500L); + } catch (Exception ignoreMe) {} + waitMs += 500L; + } + } // end while + + if (!allReplicasUp) + fail("Didn't see all replicas come up within " + maxWaitMs + " ms! ClusterState: " + cs); + + if (notLeaders.isEmpty()) + fail("Didn't isolate any replicas that are not the leader! ClusterState: " + cs); + + long diffMs = (System.currentTimeMillis() - startMs); + log.info("Took " + diffMs + " ms to see all replicas become active."); + + List replicas = new ArrayList(); + replicas.addAll(notLeaders.values()); + return replicas; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java index bfc85c587c6..ff3c5111bc7 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java @@ -436,19 +436,33 @@ public class CloudSolrServer extends SolrServer { public RouteResponse condenseResponse(NamedList response, long timeMillis) { RouteResponse condensed = new RouteResponse(); int status = 0; + Integer rf = null; + Integer minRf = null; for(int i=0; i 0) { status = s; } + Object rfObj = header.get(UpdateRequest.REPFACT); + if (rfObj != null && rfObj instanceof Integer) { + Integer routeRf = (Integer)rfObj; + if (rf == null || routeRf < rf) + rf = routeRf; + } + minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT); } NamedList cheader = new NamedList(); cheader.add("status", status); cheader.add("QTime", timeMillis); + if (rf != null) + cheader.add(UpdateRequest.REPFACT, rf); + if (minRf != null) + cheader.add(UpdateRequest.MIN_REPFACT, minRf); + condensed.add("responseHeader", cheader); return condensed; } @@ -692,4 +706,73 @@ public class CloudSolrServer extends SolrServer { return updatesToLeaders; } + /** + * Useful for determining the minimum achieved replication factor across + * all shards involved in processing an update request, typically useful + * for gauging the replication factor of a batch. + */ + @SuppressWarnings("rawtypes") + public int getMinAchievedReplicationFactor(String collection, NamedList resp) { + // it's probably already on the top-level header set by condense + NamedList header = (NamedList)resp.get("responseHeader"); + Integer achRf = (Integer)header.get(UpdateRequest.REPFACT); + if (achRf != null) + return achRf.intValue(); + + // not on the top-level header, walk the shard route tree + Map shardRf = getShardReplicationFactor(collection, resp); + for (Integer rf : shardRf.values()) { + if (achRf == null || rf < achRf) { + achRf = rf; + } + } + return (achRf != null) ? achRf.intValue() : -1; + } + + /** + * Walks the NamedList response after performing an update request looking for + * the replication factor that was achieved in each shard involved in the request. + * For single doc updates, there will be only one shard in the return value. + */ + @SuppressWarnings("rawtypes") + public Map getShardReplicationFactor(String collection, NamedList resp) { + connect(); + + Map results = new HashMap(); + if (resp instanceof CloudSolrServer.RouteResponse) { + NamedList routes = ((CloudSolrServer.RouteResponse)resp).getRouteResponses(); + ClusterState clusterState = zkStateReader.getClusterState(); + Map leaders = new HashMap(); + for (Slice slice : clusterState.getActiveSlices(collection)) { + Replica leader = slice.getLeader(); + if (leader != null) { + ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader); + String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName(); + leaders.put(leaderUrl, slice.getName()); + String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection; + leaders.put(altLeaderUrl, slice.getName()); + } + } + + Iterator> routeIter = routes.iterator(); + while (routeIter.hasNext()) { + Map.Entry next = routeIter.next(); + String host = next.getKey(); + NamedList hostResp = (NamedList)next.getValue(); + Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT); + if (rf != null) { + String shard = leaders.get(host); + if (shard == null) { + if (host.endsWith("/")) + shard = leaders.get(host.substring(0,host.length()-1)); + if (shard == null) { + shard = host; + } + } + results.put(shard, rf); + } + } + } + return results; + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java index 5a9af1e89d8..ae76bc3c448 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java @@ -239,6 +239,8 @@ public class ConcurrentUpdateSolrServer extends SolrServer { msg.append("\n\n"); msg.append("request: ").append(method.getURI()); handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString())); + } else { + onSuccess(response); } } finally { try { @@ -405,6 +407,13 @@ public class ConcurrentUpdateSolrServer extends SolrServer { public void handleError(Throwable ex) { log.error("error", ex); } + + /** + * Intended to be used as an extension point for doing post processing after a request completes. + */ + public void onSuccess(HttpResponse resp) { + // no-op by design, override to add functionality + } @Override public void shutdown() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java index 962d24771e4..ed1329549b5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java @@ -46,7 +46,8 @@ import org.apache.solr.common.util.XML; * @since solr 1.3 */ public class UpdateRequest extends AbstractUpdateRequest { - + public static final String REPFACT = "rf"; + public static final String MIN_REPFACT = "min_rf"; public static final String VER = "ver"; public static final String OVERWRITE = "ow"; public static final String COMMIT_WITHIN = "cw";