From 10ea628bb6f4104ba14485d4b3dd2ee7f369a90e Mon Sep 17 00:00:00 2001 From: Chris Hostetter Date: Mon, 16 Dec 2019 15:58:06 -0700 Subject: [PATCH] SOLR-14081: re-implement FullSolrCloudDistribCmdsTest to extend SolrCloudTestCase (cherry picked from commit db11e9e9a2c07136399ba002f2bbefe8c611b0a0) --- .../cloud/FullSolrCloudDistribCmdsTest.java | 950 +++++++----------- 1 file changed, 342 insertions(+), 608 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java index d6cbf9bcd26..4bd4b52fa17 100644 --- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java @@ -16,497 +16,274 @@ */ package org.apache.solr.cloud; -import java.io.IOException; +import java.lang.invoke.MethodHandles; + import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.LuceneTestCase.Slow; -import org.apache.solr.SolrTestCaseJ4.SuppressSSL; -import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.cloud.SocketProxy; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.client.solrj.request.UpdateRequest; -import org.apache.solr.client.solrj.response.CollectionAdminResponse; -import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.common.SolrDocument; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkNodeProps; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.CollectionParams.CollectionAction; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.util.NamedList; -import org.apache.solr.common.util.Utils; -import org.apache.zookeeper.CreateMode; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.solr.common.params.CommonParams.VERSION_FIELD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Super basic testing, no shard restarting or anything. */ @Slow -@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") -public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase { - +public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1); + @BeforeClass - public static void beforeSuperClass() { - schemaString = "schema15.xml"; // we need a string id - } - - public FullSolrCloudDistribCmdsTest() { - super(); - sliceCount = 3; + public static void setupCluster() throws Exception { + // use a 5 node cluster so with a typical 2x2 collection one node isn't involved + // helps to randomly test edge cases of hitting a node not involved in collection + configureCluster(5).configure(); } + @After + public void purgeAllCollections() throws Exception { + cluster.deleteAllCollections(); + cluster.getSolrClient().setDefaultCollection(null); + } + + /** + * Creates a new 2x2 collection using a unique name, blocking until it's state is fully active, + * and sets that collection as the default on the cluster's default CloudSolrClient. + * + * @return the name of the new collection + */ + public static String createAndSetNewDefaultCollection() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + final String name = "test_collection_" + NAME_COUNTER.getAndIncrement(); + assertEquals(RequestStatusState.COMPLETED, + CollectionAdminRequest.createCollection(name, "_default", 2, 2) + .processAndWait(cloudClient, DEFAULT_TIMEOUT)); + cloudClient.waitForState(name, DEFAULT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 2, 2)); + cloudClient.setDefaultCollection(name); + return name; + } + @Test - @ShardsFixed(num = 6) - // commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018 - @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018 - public void test() throws Exception { - handle.clear(); - handle.put("timestamp", SKIPVAL); - - waitForRecoveriesToFinish(false); + public void testBasicUpdates() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + final String collectionName = createAndSetNewDefaultCollection(); // add a doc, update it, and delete it + addUpdateDelete("doc1"); + assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound()); - QueryResponse results; - UpdateRequest uReq; - long docId = addUpdateDelete(); - - // add 2 docs in a request - SolrInputDocument doc1; - SolrInputDocument doc2; - docId = addTwoDocsInOneRequest(docId); - - // two deletes - uReq = new UpdateRequest(); - uReq.deleteById(Long.toString(docId-1)); - uReq.deleteById(Long.toString(docId-2)).process(cloudClient); - controlClient.deleteById(Long.toString(docId-1)); - controlClient.deleteById(Long.toString(docId-2)); - - commit(); - - results = query(cloudClient); - assertEquals(0, results.getResults().getNumFound()); - - results = query(controlClient); - assertEquals(0, results.getResults().getNumFound()); - - // add two docs together, a 3rd doc and a delete - indexr("id", docId++, t1, "originalcontent"); - - uReq = new UpdateRequest(); - doc1 = new SolrInputDocument(); + // add 2 docs in a single request + addTwoDocsInOneRequest("doc2", "doc3"); + assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound()); - addFields(doc1, "id", docId++); - uReq.add(doc1); - doc2 = new SolrInputDocument(); - addFields(doc2, "id", docId++); - uReq.add(doc2); - - uReq.process(cloudClient); - uReq.process(controlClient); + // 2 deletes in a single request... + assertEquals(0, (new UpdateRequest().deleteById("doc2").deleteById("doc3")) + .process(cloudClient).getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); - uReq = new UpdateRequest(); - uReq.deleteById(Long.toString(docId - 2)).process(cloudClient); - controlClient.deleteById(Long.toString(docId - 2)); + assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound()); - commit(); - - assertDocCounts(VERBOSE); - - checkShardConsistency(); - - results = query(controlClient); - assertEquals(2, results.getResults().getNumFound()); - - results = query(cloudClient); - assertEquals(2, results.getResults().getNumFound()); - - docId = testIndexQueryDeleteHierarchical(docId); - - docId = testIndexingDocPerRequestWithHttpSolrClient(docId); - - testConcurrentIndexing(docId); - - // TODO: testOptimisticUpdate(results); - - testDeleteByQueryDistrib(); + // add a doc that we will then delete later after adding two other docs (all before next commit). + assertEquals(0, cloudClient.add(sdoc("id", "doc4", "content_s", "will_delete_later")).getStatus()); + assertEquals(0, cloudClient.add(sdocs(sdoc("id", "doc5"), + sdoc("id", "doc6"))).getStatus()); + assertEquals(0, cloudClient.deleteById("doc4").getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); - // See SOLR-7384 -// testDeleteByIdImplicitRouter(); -// -// testDeleteByIdCompositeRouterWithRouterField(); + assertEquals(0, cloudClient.query(params("q", "id:doc4")).getResults().getNumFound()); + assertEquals(1, cloudClient.query(params("q", "id:doc5")).getResults().getNumFound()); + assertEquals(1, cloudClient.query(params("q", "id:doc6")).getResults().getNumFound()); + assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound()); + + checkShardConsistency(params("q","*:*", "rows", "9999","_trace","post_doc_5_6")); - docId = testThatCantForwardToLeaderFails(docId); + // delete everything.... + assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); + assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound()); - - docId = testIndexingBatchPerRequestWithHttpSolrClient(docId); + checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll")); + } - private void testDeleteByIdImplicitRouter() throws Exception { - SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0))); - CollectionAdminResponse response; - Map> coresStatus; - CollectionAdminRequest.Create createCollectionRequest - = CollectionAdminRequest.createCollectionWithImplicitRouter("implicit_collection_without_routerfield", - "conf1","shard1,shard2",2); - response = createCollectionRequest.process(server); - - assertEquals(0, response.getStatus()); - assertTrue(response.isSuccess()); - coresStatus = response.getCollectionCoresStatus(); - assertEquals(4, coresStatus.size()); - for (int i = 0; i < 4; i++) { - NamedList status = coresStatus.get("implicit_collection_without_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1)); - assertEquals(0, (int) status.get("status")); - assertTrue(status.get("QTime") > 0); - } - - waitForRecoveriesToFinish("implicit_collection_without_routerfield", true); - - SolrClient shard1 = createNewSolrClient("implicit_collection_without_routerfield_shard1_replica1", - getBaseUrl((HttpSolrClient) clients.get(0))); - SolrClient shard2 = createNewSolrClient("implicit_collection_without_routerfield_shard2_replica1", - getBaseUrl((HttpSolrClient) clients.get(0))); - - SolrInputDocument doc = new SolrInputDocument(); - int docCounts1, docCounts2; - - // Add three documents to shard1 - doc.clear(); - doc.addField("id", "1"); - doc.addField("title", "s1 one"); - shard1.add(doc); - shard1.commit(); - - doc.clear(); - doc.addField("id", "2"); - doc.addField("title", "s1 two"); - shard1.add(doc); - shard1.commit(); - - doc.clear(); - doc.addField("id", "3"); - doc.addField("title", "s1 three"); - shard1.add(doc); - shard1.commit(); - - docCounts1 = 3; // Three documents in shard1 - - // Add two documents to shard2 - doc.clear(); - doc.addField("id", "4"); - doc.addField("title", "s2 four"); - shard2.add(doc); - shard2.commit(); - - doc.clear(); - doc.addField("id", "5"); - doc.addField("title", "s2 five"); - shard2.add(doc); - shard2.commit(); - - docCounts2 = 2; // Two documents in shard2 - - // Verify the documents were added to correct shards - ModifiableSolrParams query = new ModifiableSolrParams(); - query.set("q", "*:*"); - QueryResponse respAll = shard1.query(query); - assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound()); - - query.set("shards", "shard1"); - QueryResponse resp1 = shard1.query(query); - assertEquals(docCounts1, resp1.getResults().getNumFound()); - - query.set("shards", "shard2"); - QueryResponse resp2 = shard2.query(query); - assertEquals(docCounts2, resp2.getResults().getNumFound()); - - - // Delete a document in shard2 with update to shard1, with _route_ param - // Should delete. - UpdateRequest deleteRequest = new UpdateRequest(); - deleteRequest.deleteById("4", "shard2"); - shard1.request(deleteRequest); - shard1.commit(); - query.set("shards", "shard2"); - resp2 = shard2.query(query); - assertEquals(--docCounts2, resp2.getResults().getNumFound()); - - // Delete a document in shard2 with update to shard1, without _route_ param - // Shouldn't delete, since deleteById requests are not broadcast to all shard leaders. - deleteRequest = new UpdateRequest(); - deleteRequest.deleteById("5"); - shard1.request(deleteRequest); - shard1.commit(); - query.set("shards", "shard2"); - resp2 = shard2.query(query); - assertEquals(docCounts2, resp2.getResults().getNumFound()); - - // Multiple deleteById commands in a single request - deleteRequest.clear(); - deleteRequest.deleteById("2", "shard1"); - deleteRequest.deleteById("3", "shard1"); - deleteRequest.setCommitWithin(1); - query.set("shards", "shard1"); - shard2.request(deleteRequest); - resp1 = shard1.query(query); - --docCounts1; - --docCounts1; - assertEquals(docCounts1, resp1.getResults().getNumFound()); - - // Test commitWithin, update to shard2, document deleted in shard1 - deleteRequest.clear(); - deleteRequest.deleteById("1", "shard1"); - deleteRequest.setCommitWithin(1); - shard2.request(deleteRequest); - Thread.sleep(1000); - query.set("shards", "shard1"); - resp1 = shard1.query(query); - assertEquals(--docCounts1, resp1.getResults().getNumFound()); - } - - private void testDeleteByIdCompositeRouterWithRouterField() throws Exception { - SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0))); - CollectionAdminResponse response; - Map> coresStatus; - - response = CollectionAdminRequest.createCollection("compositeid_collection_with_routerfield","conf1",2,2) - .setRouterName("compositeId") - .setRouterField("routefield_s") - .setShards("shard1,shard2") - .process(server); - - assertEquals(0, response.getStatus()); - assertTrue(response.isSuccess()); - coresStatus = response.getCollectionCoresStatus(); - assertEquals(4, coresStatus.size()); - for (int i = 0; i < 4; i++) { - NamedList status = coresStatus.get("compositeid_collection_with_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1)); - assertEquals(0, (int) status.get("status")); - assertTrue(status.get("QTime") > 0); - } - - waitForRecoveriesToFinish("compositeid_collection_with_routerfield", true); - - SolrClient shard1 = createNewSolrClient("compositeid_collection_with_routerfield_shard1_replica1", - getBaseUrl((HttpSolrClient) clients.get(0))); - SolrClient shard2 = createNewSolrClient("compositeid_collection_with_routerfield_shard2_replica1", - getBaseUrl((HttpSolrClient) clients.get(0))); - - SolrInputDocument doc = new SolrInputDocument(); - int docCounts1 = 0, docCounts2 = 0; - - // Add three documents to shard1 - doc.clear(); - doc.addField("id", "1"); - doc.addField("title", "s1 one"); - doc.addField("routefield_s", "europe"); - shard1.add(doc); - shard1.commit(); - - doc.clear(); - doc.addField("id", "2"); - doc.addField("title", "s1 two"); - doc.addField("routefield_s", "europe"); - shard1.add(doc); - shard1.commit(); - - doc.clear(); - doc.addField("id", "3"); - doc.addField("title", "s1 three"); - doc.addField("routefield_s", "europe"); - shard1.add(doc); - shard1.commit(); - - docCounts1 = 3; // Three documents in shard1 - - // Add two documents to shard2 - doc.clear(); - doc.addField("id", "4"); - doc.addField("title", "s2 four"); - doc.addField("routefield_s", "africa"); - shard2.add(doc); - //shard2.commit(); - - doc.clear(); - doc.addField("id", "5"); - doc.addField("title", "s2 five"); - doc.addField("routefield_s", "africa"); - shard2.add(doc); - shard2.commit(); - - docCounts2 = 2; // Two documents in shard2 - - // Verify the documents were added to correct shards - ModifiableSolrParams query = new ModifiableSolrParams(); - query.set("q", "*:*"); - QueryResponse respAll = shard1.query(query); - assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound()); - - query.set("shards", "shard1"); - QueryResponse resp1 = shard1.query(query); - assertEquals(docCounts1, resp1.getResults().getNumFound()); - - query.set("shards", "shard2"); - QueryResponse resp2 = shard2.query(query); - assertEquals(docCounts2, resp2.getResults().getNumFound()); - - // Delete a document in shard2 with update to shard1, with _route_ param - // Should delete. - UpdateRequest deleteRequest = new UpdateRequest(); - deleteRequest.deleteById("4", "africa"); - deleteRequest.setCommitWithin(1); - shard1.request(deleteRequest); - shard1.commit(); - - query.set("shards", "shard2"); - resp2 = shard2.query(query); - --docCounts2; - assertEquals(docCounts2, resp2.getResults().getNumFound()); - - // Multiple deleteById commands in a single request - deleteRequest.clear(); - deleteRequest.deleteById("2", "europe"); - deleteRequest.deleteById("3", "europe"); - deleteRequest.setCommitWithin(1); - query.set("shards", "shard1"); - shard1.request(deleteRequest); - shard1.commit(); - Thread.sleep(1000); - resp1 = shard1.query(query); - --docCounts1; - --docCounts1; - assertEquals(docCounts1, resp1.getResults().getNumFound()); - - // Test commitWithin, update to shard2, document deleted in shard1 - deleteRequest.clear(); - deleteRequest.deleteById("1", "europe"); - deleteRequest.setCommitWithin(1); - shard2.request(deleteRequest); - query.set("shards", "shard1"); - resp1 = shard1.query(query); - --docCounts1; - assertEquals(docCounts1, resp1.getResults().getNumFound()); - } - - private long testThatCantForwardToLeaderFails(long docId) throws Exception { - ZkStateReader zkStateReader = cloudClient.getZkStateReader(); - ZkNodeProps props = zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard1"); + public void testThatCantForwardToLeaderFails() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + final String collectionName = "test_collection_" + NAME_COUNTER.getAndIncrement(); + cloudClient.setDefaultCollection(collectionName); - chaosMonkey.stopShard("shard1"); - - Thread.sleep(1000); - - // fake that the leader is still advertised - String leaderPath = ZkStateReader.getShardLeadersPath(DEFAULT_COLLECTION, "shard1"); - SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), 10000); - int fails = 0; + // get a random node for use in our collection before creating the one we'll partition.. + final JettySolrRunner otherLeader = cluster.getRandomJetty(random()); + // pick a (second) random node (which may be the same) for sending updates to + // (if it's the same, we're testing routing from another shard, if diff we're testing routing + // from a non-collection node) + final String indexingUrl = cluster.getRandomJetty(random()).getProxyBaseUrl() + "/" + collectionName; + + // create a new node for the purpose of killing it... + final JettySolrRunner leaderToPartition = cluster.startJettySolrRunner(); try { - zkClient.makePath(leaderPath, Utils.toJSON(props), - CreateMode.EPHEMERAL, true); - for (int i = 0; i < 200; i++) { - try { - index_specific(shardToJetty.get("shard2").get(0).client.solrClient, id, docId++); - } catch (SolrException e) { - // expected - fails++; - break; - } catch (SolrServerException e) { - // expected - fails++; - break; + cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT); + + // HACK: we have to stop the node in order to enable the proxy, in order to then restart the node + // (in order to then "partition it" later via the proxy) + final SocketProxy proxy = new SocketProxy(); + cluster.stopJettySolrRunner(leaderToPartition); + cluster.waitForJettyToStop(leaderToPartition); + leaderToPartition.setProxyPort(proxy.getListenPort()); + cluster.startJettySolrRunner(leaderToPartition); + proxy.open(leaderToPartition.getBaseUrl().toURI()); + try { + log.info("leaderToPartition's Proxy: {}", proxy); + + cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT); + // create a 2x1 collection using a nodeSet that includes our leaderToPartition... + assertEquals(RequestStatusState.COMPLETED, + CollectionAdminRequest.createCollection(collectionName, 2, 1) + .setCreateNodeSet(leaderToPartition.getNodeName() + "," + otherLeader.getNodeName()) + .processAndWait(cloudClient, DEFAULT_TIMEOUT)); + + cloudClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 2, 1)); + + { // HACK: Check the leaderProps for the shard hosted on the node we're going to kill... + final Replica leaderProps = cloudClient.getZkStateReader() + .getClusterState().getCollection(collectionName) + .getLeaderReplicas(leaderToPartition.getNodeName()).get(0); + + // No point in this test if these aren't true... + assertNotNull("Sanity check: leaderProps isn't a leader?: " + leaderProps.toString(), + leaderProps.getStr(Slice.LEADER)); + assertTrue("Sanity check: leaderProps isn't using the proxy port?: " + leaderProps.toString(), + leaderProps.getCoreUrl().contains(""+proxy.getListenPort())); } + + // create client to send our updates to... + try (HttpSolrClient indexClient = getHttpSolrClient(indexingUrl)) { + + // Sanity check: we should be able to send a bunch of updates that work right now... + for (int i = 0; i < 100; i++) { + final UpdateResponse rsp = indexClient.add + (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))); + assertEquals(0, rsp.getStatus()); + } + + log.info("Closing leaderToPartition's proxy: {}", proxy); + proxy.close(); // NOTE: can't use halfClose, won't ensure a garunteed failure + + final SolrException e = expectThrows(SolrException.class, () -> { + // start at 50 so that we have some "updates" to previous docs and some "adds"... + for (int i = 50; i < 250; i++) { + // Pure random odds of all of these docs belonging to the live shard are 1 in 2**200... + // Except we know the hashing algorithm isn't purely random, + // So the actual odds are "0" unless the hashing algorithm is changed to suck badly... + final UpdateResponse rsp = indexClient.add + (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))); + // if the update didn't throw an exception, it better be a success.. + assertEquals(0, rsp.getStatus()); + } + }); + assertEquals(500, e.code()); + } + } finally { + proxy.close(); // don't leak this port } } finally { - zkClient.close(); + cluster.stopJettySolrRunner(leaderToPartition); // don't let this jetty bleed into other tests + cluster.waitForJettyToStop(leaderToPartition); } + } + + /** NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */ + private void addTwoDocsInOneRequest(String docIdA, String docIdB) throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); - assertTrue("A whole shard is down - some of these should fail", fails > 0); - return docId; + assertEquals(0, cloudClient.add(sdocs(sdoc("id", docIdA), + sdoc("id", docIdB))).getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); + + assertEquals(2, cloudClient.query(params("q","id:(" + docIdA + " OR " + docIdB + ")") + ).getResults().getNumFound()); + + checkShardConsistency(params("q","*:*", "rows", "99","_trace","two_docs")); } - private long addTwoDocsInOneRequest(long docId) throws - Exception { - QueryResponse results; - UpdateRequest uReq; - uReq = new UpdateRequest(); - docId = addDoc(docId, uReq); - docId = addDoc(docId, uReq); - - uReq.process(cloudClient); - uReq.process(controlClient); - - commit(); - - checkShardConsistency(); - - assertDocCounts(VERBOSE); - - results = query(cloudClient); - assertEquals(2, results.getResults().getNumFound()); - return docId; - } + /** NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */ + private void addUpdateDelete(String docId) throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); - private long addUpdateDelete() throws Exception, - IOException { - long docId = 99999999L; - indexr("id", docId, t1, "originalcontent"); + // add the doc, confirm we can query it... + assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "originalcontent")).getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); - commit(); + assertEquals(1, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound()); + assertEquals(1, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound()); + assertEquals(1, + cloudClient.query(params("q", "content_t:originalcontent AND id:" + docId)) + .getResults().getNumFound()); - ModifiableSolrParams params = new ModifiableSolrParams(); - params.add("q", t1 + ":originalcontent"); - QueryResponse results = clients.get(0).query(params); - assertEquals(1, results.getResults().getNumFound()); + checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","original_doc")); // update doc - indexr("id", docId, t1, "updatedcontent"); + assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "updatedcontent")).getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); - commit(); + // confirm we can query the doc by updated content and not original... + assertEquals(0, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound()); + assertEquals(1, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound()); + assertEquals(1, + cloudClient.query(params("q", "content_t:updatedcontent AND id:" + docId)) + .getResults().getNumFound()); - assertDocCounts(VERBOSE); + // delete the doc, confim it no longer matches in queries... + assertEquals(0, cloudClient.deleteById(docId).getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); - results = clients.get(0).query(params); - assertEquals(0, results.getResults().getNumFound()); + assertEquals(0, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound()); + assertEquals(0, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound()); - params.set("q", t1 + ":updatedcontent"); - - results = clients.get(0).query(params); - assertEquals(1, results.getResults().getNumFound()); - - UpdateRequest uReq = new UpdateRequest(); - //uReq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN); - uReq.deleteById(Long.toString(docId)).process(clients.get(0)); - - commit(); - - results = clients.get(0).query(params); - assertEquals(0, results.getResults().getNumFound()); - return docId; + checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","del_updated_doc")); + } - private void testDeleteByQueryDistrib() throws Exception { - del("*:*"); - commit(); - assertEquals(0, query(cloudClient).getResults().getNumFound()); - } - private long testIndexQueryDeleteHierarchical(long docId) throws Exception { - //index - int topDocsNum = atLeast(10); + public long testIndexQueryDeleteHierarchical() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + final String collectionName = createAndSetNewDefaultCollection(); + + // index + long docId = 42; + int topDocsNum = atLeast(5); int childsNum = 5+random().nextInt(5); for (int i = 0; i < topDocsNum; ++i) { UpdateRequest uReq = new UpdateRequest(); @@ -522,40 +299,46 @@ public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase } uReq.add(topDocument); - uReq.process(cloudClient); - uReq.process(controlClient); + assertEquals(i + "/" + docId, + 0, uReq.process(cloudClient).getStatus()); } + assertEquals(0, cloudClient.commit().getStatus()); + + checkShardConsistency(params("q","*:*", "rows", "9999","_trace","added_all_top_docs_with_kids")); - commit(); - checkShardConsistency(); - assertDocCounts(VERBOSE); + // query - //query // parents - SolrQuery query = new SolrQuery("type_s:parent"); - QueryResponse results = cloudClient.query(query); - assertEquals(topDocsNum, results.getResults().getNumFound()); + assertEquals(topDocsNum, + cloudClient.query(new SolrQuery("type_s:parent")).getResults().getNumFound()); - //childs - query = new SolrQuery("type_s:child"); - results = cloudClient.query(query); - assertEquals(topDocsNum * childsNum, results.getResults().getNumFound()); + // childs + assertEquals(topDocsNum * childsNum, + cloudClient.query(new SolrQuery("type_s:child")).getResults().getNumFound()); + - //grandchilds - query = new SolrQuery("type_s:grand"); - results = cloudClient.query(query); + // grandchilds + // //each topDoc has t childs where each child has x = 0 + 2 + 4 + ..(t-1)*2 grands //x = 2 * (1 + 2 + 3 +.. (t-1)) => arithmetic summ of t-1 //x = 2 * ((t-1) * t / 2) = t * (t - 1) - assertEquals(topDocsNum * childsNum * (childsNum - 1), results.getResults().getNumFound()); + assertEquals(topDocsNum * childsNum * (childsNum - 1), + cloudClient.query(new SolrQuery("type_s:grand")).getResults().getNumFound()); //delete - del("*:*"); - commit(); + assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus()); + assertEquals(0, cloudClient.commit().getStatus()); + assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound()); + + checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll")); return docId; } + + /** + * Recursive helper function for building out child and grandchild docs + */ private long addChildren(String prefix, SolrInputDocument topDocument, int childIndex, boolean lastLevel, long docId) { SolrInputDocument childDocument = new SolrInputDocument(); childDocument.addField("id", docId++); @@ -574,188 +357,139 @@ public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase } - private long testIndexingDocPerRequestWithHttpSolrClient(long docId) throws Exception { - int docs = random().nextInt(TEST_NIGHTLY ? 4013 : 97) + 1; - for (int i = 0; i < docs; i++) { + public void testIndexingOneDocPerRequestWithHttpSolrClient() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + final String collectionName = createAndSetNewDefaultCollection(); + + final int numDocs = atLeast(50); + for (int i = 0; i < numDocs; i++) { UpdateRequest uReq; uReq = new UpdateRequest(); - docId = addDoc(docId, uReq); - - uReq.process(cloudClient); - uReq.process(controlClient); - + assertEquals(0, cloudClient.add + (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))).getStatus()); } - commit(); + assertEquals(0, cloudClient.commit().getStatus()); + assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound()); - checkShardConsistency(); - assertDocCounts(VERBOSE); - - return docId++; + checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll")); } - private long testIndexingBatchPerRequestWithHttpSolrClient(long docId) throws Exception { - - // remove collection - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set("action", CollectionAction.DELETE.toString()); - params.set("name", "collection1"); - QueryRequest request = new QueryRequest(params); - request.setPath("/admin/collections"); - - - cloudClient.request(request); - - controlClient.deleteByQuery("*:*"); - controlClient.commit(); - - // somtimes we use an oversharded collection - createCollection(null, "collection2", 7, 3, 100000, cloudClient, null, "conf1"); - cloudClient.setDefaultCollection("collection2"); - waitForRecoveriesToFinish("collection2", false); - - class IndexThread extends Thread { - Integer name; + public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + final String collectionName = createAndSetNewDefaultCollection(); + + final int numDocsPerBatch = atLeast(5); + final int numBatchesPerThread = atLeast(5); - public IndexThread(Integer name) { + final CountDownLatch abort = new CountDownLatch(1); + class BatchIndexer implements Runnable { + private boolean keepGoing() { + return 0 < abort.getCount(); + } + + final int name; + public BatchIndexer(int name) { this.name = name; } @Override public void run() { - int rnds = random().nextInt(TEST_NIGHTLY ? 10 : 3) + 1; - for (int i = 0; i < rnds; i++) { - UpdateRequest uReq; - uReq = new UpdateRequest(); - int cnt = random().nextInt(TEST_NIGHTLY ? 2000 : 200) + 1; - for (int j = 0; j threads = new ArrayList<>(); + final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("batchIndexing"); + final int numThreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1; + final List> futures = new ArrayList<>(numThreads); + for (int i = 0; i < numThreads; i++) { + futures.add(executor.submit(new BatchIndexer(i))); + } + final int totalDocsExpected = numThreads * numBatchesPerThread * numDocsPerBatch; + ExecutorUtil.shutdownAndAwaitTermination(executor); - int nthreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1; - for (int i = 0; i < nthreads; i++) { - IndexThread thread = new IndexThread(i); - threads.add(thread); - thread.start(); + for (Future result : futures) { + assertFalse(result.isCancelled()); + assertTrue(result.isDone()); + // all we care about is propogating any possibile execution exception... + final Object ignored = result.get(); } - for (Thread thread : threads) { - thread.join(); - } - - commit(); - - waitForRecoveriesToFinish("collection2", false); - - printLayout(); - - SolrQuery query = new SolrQuery("*:*"); - long controlCount = controlClient.query(query).getResults() - .getNumFound(); - long cloudCount = cloudClient.query(query).getResults().getNumFound(); - - - CloudInspectUtil.compareResults(controlClient, cloudClient); - - assertEquals("Control does not match cloud", controlCount, cloudCount); - System.out.println("DOCS:" + controlCount); - - return docId; + cloudClient.commit(); + assertEquals(totalDocsExpected, cloudClient.query(params("q","*:*")).getResults().getNumFound()); + checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done")); } - private long addDoc(long docId, UpdateRequest uReq) { - addDoc(Long.toString(docId++), uReq); - return docId; - } - - private long addDoc(String docId, UpdateRequest uReq) { - SolrInputDocument doc1 = new SolrInputDocument(); - - uReq.add(doc1); - addFields(doc1, "id", docId, "text_t", "some text so that it not's negligent work to parse this doc, even though it's still a pretty short doc"); - return -1; - } - - private long testConcurrentIndexing(long docId) throws Exception { - QueryResponse results = query(cloudClient); - long beforeCount = results.getResults().getNumFound(); - int cnt = TEST_NIGHTLY ? 2933 : 313; - try (ConcurrentUpdateSolrClient concurrentClient = getConcurrentUpdateSolrClient( - ((HttpSolrClient) clients.get(0)).getBaseURL(), 10, 2, 120000)) { - for (int i = 0; i < cnt; i++) { - index_specific(concurrentClient, id, docId++, "text_t", "some text so that it not's negligent work to parse this doc, even though it's still a pretty short doc"); - } - concurrentClient.blockUntilFinished(); + public void testConcurrentIndexing() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + final String collectionName = createAndSetNewDefaultCollection(); + + final int numDocs = atLeast(50); + final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random()); + try (ConcurrentUpdateSolrClient indexClient + = getConcurrentUpdateSolrClient(nodeToUpdate.getProxyBaseUrl() + "/" + collectionName, 10, 2)) { - commit(); + for (int i = 0; i < numDocs; i++) { + indexClient.add(sdoc("id", i, "text_t", + TestUtil.randomRealisticUnicodeString(random(), 200))); + } + indexClient.blockUntilFinished(); + + assertEquals(0, indexClient.commit().getStatus()); + assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound()); - checkShardConsistency(); - assertDocCounts(VERBOSE); + checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll")); } - results = query(cloudClient); - assertEquals(beforeCount + cnt, results.getResults().getNumFound()); - return docId; } - private void testOptimisticUpdate(QueryResponse results) throws Exception { - SolrDocument doc = results.getResults().get(0); - Long version = (Long) doc.getFieldValue(VERSION_FIELD); - Integer theDoc = (Integer) doc.getFieldValue("id"); - UpdateRequest uReq = new UpdateRequest(); - SolrInputDocument doc1 = new SolrInputDocument(); - uReq.setParams(new ModifiableSolrParams()); - uReq.getParams().set(VERSION_FIELD, Long.toString(version)); - addFields(doc1, "id", theDoc, t1, "theupdatestuff"); - uReq.add(doc1); + /** + * Inspects the cluster to determine all active shards/replicas for the default collection then, + * executes a distrib=false query using the specified params, and compares the resulting + * {@link SolrDocumentList}, failing if any replica does not agree with it's leader. + * + * @see #cluster + * @see CloudInspectUtil#showDiff + */ + private void checkShardConsistency(final SolrParams params) throws Exception { + // TODO: refactor into static in CloudInspectUtil w/ DocCollection param? + // TODO: refactor to take in a BiFunction ? - uReq.process(cloudClient); - uReq.process(controlClient); - - commit(); - - // updating the old version should fail... - SolrInputDocument doc2 = new SolrInputDocument(); - uReq = new UpdateRequest(); - uReq.setParams(new ModifiableSolrParams()); - uReq.getParams().set(VERSION_FIELD, Long.toString(version)); - addFields(doc2, "id", theDoc, t1, "thenewupdatestuff"); - uReq.add(doc2); - - uReq.process(cloudClient); - uReq.process(controlClient); - - commit(); - - ModifiableSolrParams params = new ModifiableSolrParams(); - params.add("q", t1 + ":thenewupdatestuff"); - QueryResponse res = clients.get(0).query(params); - assertEquals(0, res.getResults().getNumFound()); - - params = new ModifiableSolrParams(); - params.add("q", t1 + ":theupdatestuff"); - res = clients.get(0).query(params); - assertEquals(1, res.getResults().getNumFound()); - } - - private QueryResponse query(SolrClient client) throws SolrServerException, IOException { - SolrQuery query = new SolrQuery("*:*"); - return client.query(query); - } - - protected SolrInputDocument addRandFields(SolrInputDocument sdoc) { - return sdoc; + final SolrParams perReplicaParams = SolrParams.wrapDefaults(params("distrib", "false"), + params); + final DocCollection collection = cluster.getSolrClient().getZkStateReader() + .getClusterState().getCollection(cluster.getSolrClient().getDefaultCollection()); + log.info("Checking shard consistency via: {}", perReplicaParams); + for (Map.Entry entry : collection.getActiveSlicesMap().entrySet()) { + final String shardName = entry.getKey(); + final Slice slice = entry.getValue(); + log.info("Checking: {} -> {}", shardName, slice); + final Replica leader = entry.getValue().getLeader(); + try (HttpSolrClient leaderClient = getHttpSolrClient(leader.getCoreUrl())) { + final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults(); + log.debug("Shard {}: Leader results: {}", shardName, leaderResults); + for (Replica replica : slice) { + try (HttpSolrClient replicaClient = getHttpSolrClient(replica.getCoreUrl())) { + final SolrDocumentList replicaResults = replicaClient.query(perReplicaParams).getResults(); + log.debug("Shard {}: Replica ({}) results: {}", shardName, replica.getCoreName(), replicaResults); + assertEquals("inconsistency w/leader: shard=" + shardName + "core=" + replica.getCoreName(), + Collections.emptySet(), + CloudInspectUtil.showDiff(leaderResults, replicaResults, + shardName + " leader: " + leader.getCoreUrl(), + shardName + ": " + replica.getCoreUrl())); + } + } + } + } } }