SOLR-3933: Distributed commits are not guaranteed to be ordered within a request.

SOLR-3939: An empty or just replicated index cannot become the leader of a shard after a leader goes down.
  
SOLR-3971: A collection that is created with numShards=1 turns into a numShards=2 collection after starting up a second core and not specifying numShards.

SOLR-3932: SolrCmdDistributorTest either takes 3 seconds or 3 minutes.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1401798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-10-24 17:52:15 +00:00
parent 583f72b4c7
commit e1b693ddb2
14 changed files with 433 additions and 89 deletions

View File

@ -88,6 +88,16 @@ Bug Fixes
* SOLR-3961: Fixed error using LimitTokenCountFilterFactory
(Jack Krupansky, hossman)
* SOLR-3933: Distributed commits are not guaranteed to be ordered within a
request. (Mark Miller)
* SOLR-3939: An empty or just replicated index cannot become the leader of a
shard after a leader goes down. (Joel Bernstein, yonik, Mark Miller)
* SOLR-3971: A collection that is created with numShards=1 turns into a
numShards=2 collection after starting up a second core and not specifying
numShards. (Mark Miller)
Other Changes
----------------------
@ -100,6 +110,9 @@ Other Changes
* SOLR-3966: Eliminate superfluous warning from LanguageIdentifierUpdateProcessor
(Markus Jelsma via hossman)
* SOLR-3932: SolrCmdDistributorTest either takes 3 seconds or 3 minutes.
(yonik, Mark Miller)
================== 4.0.0 ==================
Versions of Major Components

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.bio.SocketConnector;
import org.eclipse.jetty.server.handler.GzipHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.server.session.HashSessionIdManager;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -92,6 +93,7 @@ public class JettySolrRunner {
private void init(String solrHome, String context, int port, boolean stopAtShutdown) {
this.context = context;
server = new Server(port);
this.solrHome = solrHome;
this.stopAtShutdown = stopAtShutdown;
server.setStopAtShutdown(stopAtShutdown);
@ -100,32 +102,45 @@ public class JettySolrRunner {
}
System.setProperty("solr.solr.home", solrHome);
if (System.getProperty("jetty.testMode") != null) {
// SelectChannelConnector connector = new SelectChannelConnector();
// Normal SocketConnector is what solr's example server uses by default
SocketConnector connector = new SocketConnector();
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(port);
connector.setReuseAddress(true);
if (!stopAtShutdown) {
QueuedThreadPool threadPool = (QueuedThreadPool) connector
.getThreadPool();
if (threadPool != null) {
connector.setLowResourcesMaxIdleTime(1500);
QueuedThreadPool threadPool = (QueuedThreadPool) connector
.getThreadPool();
if (threadPool != null) {
threadPool.setMaxThreads(10000);
threadPool.setMaxIdleTimeMs(5000);
if (!stopAtShutdown) {
threadPool.setMaxStopTimeMs(100);
}
}
server.setConnectors(new Connector[] {connector});
server.setSessionIdManager(new HashSessionIdManager(new Random()));
} else {
if (!stopAtShutdown) {
for (Connector connector : server.getConnectors()) {
if (connector instanceof SocketConnector) {
QueuedThreadPool threadPool = (QueuedThreadPool) ((SocketConnector) connector)
.getThreadPool();
if (threadPool != null) {
threadPool.setMaxStopTimeMs(100);
}
for (Connector connector : server.getConnectors()) {
QueuedThreadPool threadPool = null;
if (connector instanceof SocketConnector) {
threadPool = (QueuedThreadPool) ((SocketConnector) connector)
.getThreadPool();
}
if (connector instanceof SelectChannelConnector) {
threadPool = (QueuedThreadPool) ((SelectChannelConnector) connector)
.getThreadPool();
}
if (threadPool != null) {
threadPool.setMaxThreads(10000);
threadPool.setMaxIdleTimeMs(5000);
if (!stopAtShutdown) {
threadPool.setMaxStopTimeMs(100);
}
}
}
}
// Initialize the servlets

View File

@ -14,6 +14,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -162,6 +163,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
log.info("I may be the new leader - try and sync");
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
// we are going to attempt to be the leader
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
@ -173,6 +178,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
success = false;
}
if (!success && ulog.getRecentUpdates().getVersions(1).isEmpty()) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway
log.info("We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
// if !success but no one else is in active mode,
// we are the leader anyway
// TODO: should we also be leader if there is only one other active?
@ -308,13 +321,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return;
}
log.info("There is a better leader candidate than us - going back into recovery");
try {
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
} catch (Throwable t) {
SolrException.log(log, "Error trying to publish down state", t);
}
log.info("There may be a better leader candidate than us - going back into recovery");
cancelElection();

View File

@ -17,14 +17,11 @@ package org.apache.solr.cloud;
* the License.
*/
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.noggit.JSONUtil;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClosableThread;
@ -210,11 +207,11 @@ public class Overseer {
private ClusterState updateState(ClusterState state, final ZkNodeProps message) {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
final String zkCoreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
final Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null;
Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null;
log.info("Update state numShards={} message={}", numShards, message);
//collection does not yet exist, create placeholders if num shards is specified
if (!state.getCollections().contains(collection)
&& numShards!=null) {
boolean collectionExists = state.getCollections().contains(collection);
if (!collectionExists && numShards!=null) {
state = createCollection(state, collection, numShards);
}
@ -227,6 +224,10 @@ public class Overseer {
}
if(sliceName == null) {
//request new shardId
if (collectionExists) {
// use existing numShards
numShards = state.getCollectionStates().get(collection).size();
}
sliceName = AssignShard.assignShard(collection, state, numShards);
}
@ -269,6 +270,8 @@ public class Overseer {
}
private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
log.info("Create collection {} with numShards {}", collectionName, numShards);
HashPartitioner hp = new HashPartitioner();
List<HashPartitioner.Range> ranges = hp.partitionRange(numShards, hp.fullRange());

View File

@ -348,7 +348,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), ulog.numRecordsToKeep);
Collections.singletonList(leaderUrl), ulog.numRecordsToKeep, false, false);
peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync();
if (syncSuccess) {
@ -443,7 +443,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// Or do a fall off retry...
try {
log.error("Recovery failed - trying again... core=" + coreName);
log.error("Recovery failed - trying again... (" + retries + ") core=" + coreName);
if (isClosed()) {
retries = INTERRUPTED;
@ -451,7 +451,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
if (retries >= INTERRUPTED) {
SolrException.log(log, "Recovery failed - interrupted. core="
+ coreName);
try {
@ -463,7 +463,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
}
} else {
SolrException.log(log,
"Recovery failed - max retries exceeded. core=" + coreName);
"Recovery failed - max retries exceeded (" + retries + "). core=" + coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
@ -482,6 +482,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
try {
// start at 1 sec and work up to a couple min
double loopCount = Math.min(Math.pow(2, retries), 600);
log.info("Wait {} seconds before trying to recover again ({})", loopCount, retries);
for (int i = 0; i < loopCount; i++) {
if (isClosed()) break; // check if someone closed us
Thread.sleep(STARTING_RECOVERY_DELAY);

View File

@ -176,7 +176,7 @@ public class SyncStrategy {
// if we can't reach a replica for sync, we still consider the overall sync a success
// TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
// to recover once more?
PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true);
PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true, true);
return peerSync.sync();
}

View File

@ -781,6 +781,7 @@ public final class ZkController {
//System.out.println(Thread.currentThread().getStackTrace()[3]);
Integer numShards = cd.getCloudDescriptor().getNumShards();
if (numShards == null) { //XXX sys prop hack
log.info("numShards not found on descriptor - reading it from system property");
numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
}

View File

@ -525,7 +525,7 @@ public class RealTimeGetComponent extends SearchComponent
boolean cantReachIsSuccess = rb.req.getParams().getBool("cantReachIsSuccess", false);
PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess);
PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess, true);
boolean success = peerSync.sync();
// TODO: more complex response?

View File

@ -19,6 +19,7 @@ package org.apache.solr.update;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@ -79,6 +80,7 @@ public class PeerSync {
private long ourLowThreshold; // 20th percentile
private long ourHighThreshold; // 80th percentile
private boolean cantReachIsSuccess;
private boolean getNoVersionsIsSuccess;
private static final HttpClient client;
static {
ModifiableSolrParams params = new ModifiableSolrParams();
@ -129,14 +131,15 @@ public class PeerSync {
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
this(core, replicas, nUpdates, false);
this(core, replicas, nUpdates, false, true);
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess) {
public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
this.replicas = replicas;
this.nUpdates = nUpdates;
this.maxUpdates = nUpdates;
this.cantReachIsSuccess = cantReachIsSuccess;
this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
uhandler = core.getUpdateHandler();
@ -301,7 +304,7 @@ public class PeerSync {
Throwable solrException = ((SolrServerException) srsp.getException())
.getRootCause();
if (solrException instanceof ConnectException || solrException instanceof ConnectTimeoutException
|| solrException instanceof NoHttpResponseException) {
|| solrException instanceof NoHttpResponseException || solrException instanceof SocketException) {
log.warn(msg() + " couldn't connect to " + srsp.getShardAddress() + ", counting as success");
return true;
@ -343,7 +346,7 @@ public class PeerSync {
log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
if (otherVersions.size() == 0) {
return true;
return getNoVersionsIsSuccess;
}
boolean completeList = otherVersions.size() < nUpdates; // do we have their complete list of updates?

View File

@ -101,7 +101,6 @@ public class SolrCmdDistributor {
public void finish() {
// piggyback on any outstanding adds or deletes if possible.
flushAdds(1);
flushDeletes(1);
@ -150,6 +149,12 @@ public class SolrCmdDistributor {
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
ModifiableSolrParams params) throws IOException {
// make sure we are ordered
flushAdds(1);
flushDeletes(1);
// Wait for all outstanding responses to make sure that a commit
// can't sneak in ahead of adds or deletes we already sent.
// We could do this on a per-server basis, but it's more complex
@ -163,7 +168,7 @@ public class SolrCmdDistributor {
addCommit(ureq, cmd);
log.info("Distrib commit to:" + nodes);
log.info("Distrib commit to:" + nodes + " params:" + params);
for (Node node : nodes) {
submit(ureq, node);

View File

@ -59,11 +59,11 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
"now is the time for all good men", "foo_f", 1.414f, "foo_b", "true",
"foo_d", 1.414d);
// make sure we are in a steady state...
waitForRecoveriesToFinish(false);
commit();
// make sure we are in a steady state...
waitForRecoveriesToFinish(false);
assertDocCounts(false);
indexAbunchOfDocs();
@ -203,6 +203,9 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
}
commit();
printLayout();
query("q", "*:*", "sort", "n_tl1 desc");
// long cloudClientDocs = cloudClient.query(new

View File

@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@ -66,9 +67,11 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrCmdDistributor.Request;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
@ -124,7 +127,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
fixShardCount = true;
sliceCount = 2;
shardCount = 3;
shardCount = 4;
completionService = new ExecutorCompletionService<Request>(executor);
pending = new HashSet<Future<Request>>();
@ -319,17 +322,18 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
// would be better if these where all separate tests - but much, much
// slower
doOptimisticLockingAndUpdating();
testMultipleCollections();
testANewCollectionInOneInstance();
testSearchByCollectionName();
testANewCollectionInOneInstanceWithManualShardAssignement();
testNumberOfCommitsWithCommitAfterAdd();
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit");
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit");
testCollectionsAPI();
// doOptimisticLockingAndUpdating();
// testMultipleCollections();
// testANewCollectionInOneInstance();
// testSearchByCollectionName();
// testANewCollectionInOneInstanceWithManualShardAssignement();
// testNumberOfCommitsWithCommitAfterAdd();
//
// testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit");
// testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit");
//
// testCollectionsAPI();
testCoreUnloadAndLeaders();
// Thread.sleep(10000000000L);
if (DEBUG) {
@ -337,6 +341,215 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
}
}
/**
* @throws Exception on any problem
*/
private void testCoreUnloadAndLeaders() throws Exception {
// create a new collection collection
SolrServer client = clients.get(0);
String url1 = getBaseUrl(client);
HttpSolrServer server = new HttpSolrServer(url1);
Create createCmd = new Create();
createCmd.setCoreName("unloadcollection1");
createCmd.setCollection("unloadcollection");
createCmd.setNumShards(1);
String core1DataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection1" + "_1n";
createCmd.setDataDir(core1DataDir);
server.request(createCmd);
zkStateReader.updateClusterState(true);
int slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").size();
assertEquals(1, slices);
client = clients.get(1);
String url2 = getBaseUrl(client);
server = new HttpSolrServer(url2);
createCmd = new Create();
createCmd.setCoreName("unloadcollection2");
createCmd.setCollection("unloadcollection");
String core2dataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection1" + "_2n";
createCmd.setDataDir(core2dataDir);
server.request(createCmd);
zkStateReader.updateClusterState(true);
slices = zkStateReader.getClusterState().getCollectionStates().get("unloadcollection").size();
assertEquals(1, slices);
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
ZkCoreNodeProps leaderProps = getLeaderUrlFromZk("unloadcollection", "shard1");
Random random = random();
HttpSolrServer collectionClient;
if (random.nextBoolean()) {
collectionClient = new HttpSolrServer(leaderProps.getCoreUrl());
// lets try and use the solrj client to index and retrieve a couple
// documents
SolrInputDocument doc1 = getDoc(id, 6, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall");
SolrInputDocument doc2 = getDoc(id, 7, i1, -600, tlong, 600, t1,
"humpty dumpy3 sat on a walls");
SolrInputDocument doc3 = getDoc(id, 8, i1, -600, tlong, 600, t1,
"humpty dumpy2 sat on a walled");
collectionClient.add(doc1);
collectionClient.add(doc2);
collectionClient.add(doc3);
collectionClient.commit();
}
// create another replica for our collection
client = clients.get(2);
String url3 = getBaseUrl(client);
server = new HttpSolrServer(url3);
createCmd = new Create();
createCmd.setCoreName("unloadcollection3");
createCmd.setCollection("unloadcollection");
String core3dataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection" + "_3n";
createCmd.setDataDir(core3dataDir);
server.request(createCmd);
Thread.sleep(1000);
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
// so that we start with some versions when we reload...
DirectUpdateHandler2.commitOnClose = false;
HttpSolrServer addClient = new HttpSolrServer(url3 + "/unloadcollection3");
// add a few docs
for (int x = 20; x < 100; x++) {
SolrInputDocument doc1 = getDoc(id, x, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall");
addClient.add(doc1);
}
// don't commit so they remain in the tran log
//collectionClient.commit();
// unload the leader
collectionClient = new HttpSolrServer(leaderProps.getBaseUrl());
Unload unloadCmd = new Unload(false);
unloadCmd.setCoreName(leaderProps.getCoreName());
ModifiableSolrParams p = (ModifiableSolrParams) unloadCmd.getParams();
collectionClient.request(unloadCmd);
// Thread.currentThread().sleep(500);
// printLayout();
int tries = 20;
while (leaderProps.getCoreUrl().equals(zkStateReader.getLeaderUrl("unloadcollection", "shard1", 15000))) {
Thread.sleep(100);
if (tries-- == 0) {
fail("Leader never changed");
}
}
// ensure there is a leader
zkStateReader.getLeaderProps("unloadcollection", "shard1", 15000);
addClient = new HttpSolrServer(url2 + "/unloadcollection2");
// add a few docs while the leader is down
for (int x = 101; x < 200; x++) {
SolrInputDocument doc1 = getDoc(id, x, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall");
addClient.add(doc1);
}
// create another replica for our collection
client = clients.get(3);
String url4 = getBaseUrl(client);
server = new HttpSolrServer(url4);
createCmd = new Create();
createCmd.setCoreName("unloadcollection4");
createCmd.setCollection("unloadcollection");
String core4dataDir = dataDir.getAbsolutePath() + File.separator + System.currentTimeMillis() + "unloadcollection" + "_4n";
createCmd.setDataDir(core4dataDir);
server.request(createCmd);
Thread.sleep(1000);
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
// unload the leader again
leaderProps = getLeaderUrlFromZk("unloadcollection", "shard1");
collectionClient = new HttpSolrServer(leaderProps.getBaseUrl());
unloadCmd = new Unload(false);
unloadCmd.setCoreName(leaderProps.getCoreName());
p = (ModifiableSolrParams) unloadCmd.getParams();
collectionClient.request(unloadCmd);
tries = 20;
while (leaderProps.getCoreUrl().equals(zkStateReader.getLeaderUrl("unloadcollection", "shard1", 15000))) {
Thread.sleep(100);
if (tries-- == 0) {
fail("Leader never changed");
}
}
zkStateReader.getLeaderProps("unloadcollection", "shard1", 15000);
// set this back
DirectUpdateHandler2.commitOnClose = true;
// bring the downed leader back as replica
server = new HttpSolrServer(leaderProps.getBaseUrl());
createCmd = new Create();
createCmd.setCoreName(leaderProps.getCoreName());
createCmd.setCollection("unloadcollection");
createCmd.setDataDir(core1DataDir);
server.request(createCmd);
Thread.sleep(1000);
waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
server = new HttpSolrServer(url1 + "/unloadcollection");
// System.out.println(server.query(new SolrQuery("*:*")).getResults().getNumFound());
server = new HttpSolrServer(url2 + "/unloadcollection");
server.commit();
SolrQuery q = new SolrQuery("*:*");
q.set("distrib", false);
long found1 = server.query(q).getResults().getNumFound();
server = new HttpSolrServer(url3 + "/unloadcollection");
server.commit();
q = new SolrQuery("*:*");
q.set("distrib", false);
long found3 = server.query(q).getResults().getNumFound();
server = new HttpSolrServer(url4 + "/unloadcollection");
server.commit();
q = new SolrQuery("*:*");
q.set("distrib", false);
long found4 = server.query(q).getResults().getNumFound();
// all 3 shards should now have the same number of docs
assertEquals(found1, found3);
assertEquals(found3, found4);
}
private String getBaseUrl(SolrServer client) {
String url2 = ((HttpSolrServer) client).getBaseURL()
.substring(
0,
((HttpSolrServer) client).getBaseURL().length()
- DEFAULT_COLLECTION.length() -1);
return url2;
}
private void testCollectionsAPI() throws Exception {
// TODO: fragile - because we dont pass collection.confName, it will only
@ -347,31 +560,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
// create new collections rapid fire
Map<String,List<Integer>> collectionInfos = new HashMap<String,List<Integer>>();
int cnt = atLeast(3);
for (int i = 0; i < cnt; i++) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATE.toString());
int numShards = _TestUtil.nextInt(random(), 0, shardCount) + 1;
int numReplicas = _TestUtil.nextInt(random(), 0, 5) + 1;
params.set("numShards", numShards);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas);
String collectionName = "awholynewcollection_" + i;
int clientIndex = random().nextInt(2);
List<Integer> list = new ArrayList<Integer>();
list.add(numShards);
list.add(numReplicas);
collectionInfos.put(collectionName, list);
params.set("name", collectionName);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
final String baseUrl = ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().substring(
0,
((HttpSolrServer) clients.get(clientIndex)).getBaseURL().length()
- DEFAULT_COLLECTION.length() - 1);
createNewSolrServer("", baseUrl).request(request);
}
for (int i = 0; i < cnt; i++) {
createCollection(collectionInfos, i,
_TestUtil.nextInt(random(), 0, shardCount) + 1,
_TestUtil.nextInt(random(), 0, 5) + 1);
}
Set<Entry<String,List<Integer>>> collectionInfosEntrySet = collectionInfos.entrySet();
for (Entry<String,List<Integer>> entry : collectionInfosEntrySet) {
@ -387,8 +581,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
waitForNon403or404or503(collectionClient);
}
for (int i = 0; i < cnt; i++) {
waitForRecoveriesToFinish("awholynewcollection_" + i, zkStateReader, false);
for (int j = 0; j < cnt; j++) {
waitForRecoveriesToFinish("awholynewcollection_" + j, zkStateReader, false);
}
List<String> collectionNameList = new ArrayList<String>();
@ -400,7 +594,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
HttpSolrServer collectionClient = new HttpSolrServer(url);
// lets try and use the solrj client to index and retrieve a couple documents
// lets try and use the solrj client to index a couple documents
SolrInputDocument doc1 = getDoc(id, 6, i1, -600, tlong, 600, t1,
"humpty dumpy sat on a wall");
SolrInputDocument doc2 = getDoc(id, 7, i1, -600, tlong, 600, t1,
@ -442,6 +636,9 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
boolean allTimesAreCorrect = waitForReloads(collectionName, urlToTimeBefore);
assertTrue("some core start times did not change on reload", allTimesAreCorrect);
waitForRecoveriesToFinish("awholynewcollection_" + (cnt - 1), zkStateReader, false);
// remove a collection
params = new ModifiableSolrParams();
params.set("action", CollectionAction.DELETE.toString());
@ -453,9 +650,37 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
// ensure its out of the state
checkForMissingCollection(collectionName);
//collectionNameList.remove(collectionName);
}
protected void createCollection(Map<String,List<Integer>> collectionInfos,
int i, int numShards, int numReplicas) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATE.toString());
params.set("numShards", numShards);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas);
String collectionName = "awholynewcollection_" + i;
int clientIndex = random().nextInt(2);
List<Integer> list = new ArrayList<Integer>();
list.add(numShards);
list.add(numReplicas);
collectionInfos.put(collectionName, list);
params.set("name", collectionName);
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
final String baseUrl = ((HttpSolrServer) clients.get(clientIndex)).getBaseURL().substring(
0,
((HttpSolrServer) clients.get(clientIndex)).getBaseURL().length()
- DEFAULT_COLLECTION.length() - 1);
createNewSolrServer("", baseUrl).request(request);
}
private boolean waitForReloads(String collectionName, Map<String,Long> urlToTimeBefore) throws SolrServerException, IOException {
@ -537,6 +762,15 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
throw new RuntimeException("Could not find a live node for collection:" + collection);
}
private ZkCoreNodeProps getLeaderUrlFromZk(String collection, String slice) {
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
ZkNodeProps leader = clusterState.getLeader(collection, slice);
if (leader == null) {
throw new RuntimeException("Could not find leader:" + collection + " " + slice);
}
return new ZkCoreNodeProps(leader);
}
private void waitForNon403or404or503(HttpSolrServer collectionClient)
throws Exception {
@ -1099,5 +1333,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
}
System.clearProperty("numShards");
System.clearProperty("zkHost");
// insurance
DirectUpdateHandler2.commitOnClose = true;
}
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrQuery;
@ -38,9 +39,15 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.SolrCmdDistributor.Node;
import org.apache.solr.update.SolrCmdDistributor.Response;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.DefaultSolrThreadFactory;
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
@ -92,6 +99,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(5, executor);
ModifiableSolrParams params = new ModifiableSolrParams();
List<Node> nodes = new ArrayList<Node>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
@ -103,12 +111,17 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", 1);
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
Response response = cmdDistrib.getResponse();
assertEquals(response.errors.toString(), 0, response.errors.size());
@ -125,18 +138,26 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
// add another 2 docs to control and 3 to client
cmdDistrib = new SolrCmdDistributor(5, executor);
cmd.solrDoc = sdoc("id", 2);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd, nodes, params);
AddUpdateCommand cmd2 = new AddUpdateCommand(null);
cmd2.solrDoc = sdoc("id", 3);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd2, nodes, params);
AddUpdateCommand cmd3 = new AddUpdateCommand(null);
cmd3.solrDoc = sdoc("id", 4);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd3, Collections.singletonList(nodes.get(1)), params);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
response = cmdDistrib.getResponse();
@ -156,9 +177,18 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = "2";
cmdDistrib = new SolrCmdDistributor(5, executor);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribDelete(dcmd, nodes, params);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
@ -184,7 +214,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
cmdDistrib = new SolrCmdDistributor(5, executor);
int cnt = atLeast(201);
int cnt = atLeast(303);
for (int i = 0; i < cnt; i++) {
nodes.clear();
for (SolrServer c : clients) {
@ -194,13 +224,13 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
HttpSolrServer httpClient = (HttpSolrServer) c;
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
System.out.println("node props:" + nodeProps);
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
}
AddUpdateCommand c = new AddUpdateCommand(null);
c.solrDoc = sdoc("id", id++);
if (nodes.size() > 0) {
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(c, nodes, params);
}
}
@ -214,11 +244,37 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
}
final AtomicInteger commits = new AtomicInteger();
for(JettySolrRunner jetty : jettys) {
CoreContainer cores = ((SolrDispatchFilter) jetty.getDispatchFilter().getFilter()).getCores();
SolrCore core = cores.getCore("collection1");
try {
core.getUpdateHandler().registerCommitCallback(new SolrEventListener() {
@Override
public void init(NamedList args) {}
@Override
public void postSoftCommit() {}
@Override
public void postCommit() {
commits.incrementAndGet();
}
@Override
public void newSearcher(SolrIndexSearcher newSearcher,
SolrIndexSearcher currentSearcher) {}
});
} finally {
core.close();
}
}
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
assertEquals(shardCount, commits.get());
for (SolrServer c : clients) {
NamedList<Object> resp = c.request(new LukeRequest());

View File

@ -186,7 +186,7 @@ public class ZkStateReader {
if (EventType.None.equals(event.getType())) {
return;
}
log.info("A cluster state change has occurred - updating...");
log.info("A cluster state change has occurred - updating... ({})", ZkStateReader.this.clusterState.getLiveNodes().size());
try {
// delayed approach
@ -235,13 +235,13 @@ public class ZkStateReader {
if (EventType.None.equals(event.getType())) {
return;
}
log.info("Updating live nodes");
try {
// delayed approach
// ZkStateReader.this.updateClusterState(false, true);
synchronized (ZkStateReader.this.getUpdateLock()) {
List<String> liveNodes = zkClient.getChildren(
LIVE_NODES_ZKNODE, this, true);
log.info("Updating live nodes... ({})", liveNodes.size());
Set<String> liveNodesSet = new HashSet<String>();
liveNodesSet.addAll(liveNodes);
ClusterState clusterState = new ClusterState(
@ -296,7 +296,7 @@ public class ZkStateReader {
clusterState = ClusterState.load(zkClient, liveNodesSet);
} else {
log.info("Updating live nodes from ZooKeeper... ");
log.info("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
clusterState = new ClusterState(
ZkStateReader.this.clusterState.getZkClusterStateVersion(), liveNodesSet,
ZkStateReader.this.clusterState.getCollectionStates());